From bb8c61b9a7a9b0044a25d7bb6e2cd352e379a942 Mon Sep 17 00:00:00 2001 From: Gargi Jaiswal Date: Mon, 2 Mar 2026 12:18:53 +0530 Subject: [PATCH 1/2] merge container and volume choosing policy into one --- .../volume/VolumeChoosingPolicyFactory.java | 8 +- .../DiskBalancerConfiguration.java | 25 +- .../diskbalancer/DiskBalancerService.java | 75 ++---- .../DefaultContainerChoosingPolicy.java | 105 -------- .../policy/DefaultVolumeChoosingPolicy.java | 156 ------------ .../DefaultVolumeContainerChoosingPolicy.java | 228 ++++++++++++++++++ ...DiskBalancerVolumeContainerCandidate.java} | 42 ++-- ...ava => VolumeContainerChoosingPolicy.java} | 35 +-- ...stDiskBalancerContainerChoosingLogic.java} | 66 ++--- .../diskbalancer/TestDiskBalancerService.java | 26 +- .../diskbalancer/TestDiskBalancerTask.java | 18 +- ... TestDiskBalancerVolumeChoosingLogic.java} | 162 +++++++++---- .../docs/content/design/diskbalancer.md | 15 +- .../docs/content/feature/DiskBalancer.md | 3 +- .../docs/content/feature/DiskBalancer.zh.md | 3 +- ... => TestContainerChoosingPerformance.java} | 122 +++++++--- ...ava => TestVolumeChoosingPerformance.java} | 82 +++++-- 17 files changed, 626 insertions(+), 545 deletions(-) delete mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java delete mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeContainerChoosingPolicy.java rename hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/{DiskBalancerVolumeChoosingPolicy.java => DiskBalancerVolumeContainerCandidate.java} (52%) rename hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/{ContainerChoosingPolicy.java => VolumeContainerChoosingPolicy.java} (55%) rename hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/{TestDefaultContainerChoosingPolicy.java => TestDiskBalancerContainerChoosingLogic.java} (82%) rename hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/{TestDefaultVolumeChoosingPolicy.java => TestDiskBalancerVolumeChoosingLogic.java} (78%) rename hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/{TestContainerChoosingPolicy.java => TestContainerChoosingPerformance.java} (69%) rename hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/{TestVolumeChoosingPolicy.java => TestVolumeChoosingPerformance.java} (76%) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java index e4c8aa3b514f..409b4d95c428 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerConfiguration; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; import org.apache.ratis.util.ReflectionUtils; /** @@ -48,9 +48,9 @@ public static VolumeChoosingPolicy getPolicy(ConfigurationSource conf) { return ReflectionUtils.newInstance(policyClass, new Class[] {ReentrantLock.class}, LOCK); } - public static DiskBalancerVolumeChoosingPolicy getDiskBalancerPolicy(ConfigurationSource conf) { - Class policyClass = conf.getObject(DiskBalancerConfiguration.class).getVolumeChoosingPolicyClass(); - return (DiskBalancerVolumeChoosingPolicy) ReflectionUtils.newInstance( + public static VolumeContainerChoosingPolicy getDiskBalancerContainerPolicy(ConfigurationSource conf) { + Class policyClass = conf.getObject(DiskBalancerConfiguration.class).getVolumeContainerChoosingPolicyClass(); + return (VolumeContainerChoosingPolicy) ReflectionUtils.newInstance( policyClass, new Class[]{ReentrantLock.class}, LOCK); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java index ba777aa21719..61600c6a3e73 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java @@ -94,20 +94,13 @@ public final class DiskBalancerConfiguration { ) private long diskBalancerTimeout = Duration.ofSeconds(300).toMillis(); - @Config(key = "hdds.datanode.disk.balancer.volume.choosing.policy", type = ConfigType.CLASS, + @Config(key = "hdds.datanode.disk.balancer.volume.container.choosing.policy", type = ConfigType.CLASS, defaultValue = "org.apache.hadoop.ozone.container.diskbalancer.policy" + - ".DefaultVolumeChoosingPolicy", + ".DefaultVolumeContainerChoosingPolicy", tags = {ConfigTag.DISKBALANCER}, - description = "The volume choosing policy of the disk balancer service.") - private Class volumeChoosingPolicyClass; - - @Config(key = "hdds.datanode.disk.balancer.container.choosing.policy", type = ConfigType.CLASS, - defaultValue = "org.apache.hadoop.ozone.container.diskbalancer.policy" + - ".DefaultContainerChoosingPolicy", - tags = {ConfigTag.DISKBALANCER}, - description = "The container choosing policy of the disk balancer " + - "service.") - private Class containerChoosingPolicyClass; + description = "The policy for selecting source/destination volumes and " + + "containers to move for disk balancing.") + private Class volumeContainerChoosingPolicyClass; @Config(key = "hdds.datanode.disk.balancer.stop.after.disk.even", type = ConfigType.BOOLEAN, @@ -165,12 +158,8 @@ public void setDiskBalancerTimeout(Duration duration) { this.diskBalancerTimeout = duration.toMillis(); } - public Class getVolumeChoosingPolicyClass() { - return volumeChoosingPolicyClass; - } - - public Class getContainerChoosingPolicyClass() { - return containerChoosingPolicyClass; + public Class getVolumeContainerChoosingPolicyClass() { + return volumeContainerChoosingPolicyClass; } public boolean isStopAfterDiskEven() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index aaa14321011e..d4b447dff725 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -41,15 +41,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DiskBalancerRunningStatus; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.server.ServerUtils; @@ -69,9 +66,9 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; @@ -126,12 +123,10 @@ public class DiskBalancerService extends BackgroundService { private Map deltaSizes; private MutableVolumeSet volumeSet; - private DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy; - private ContainerChoosingPolicy containerChoosingPolicy; + private VolumeContainerChoosingPolicy volumeContainerChoosingPolicy; private final File diskBalancerInfoFile; private DiskBalancerServiceMetrics metrics; - private long containerDefaultSize; public DiskBalancerService(OzoneContainer ozoneContainer, long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit, @@ -148,15 +143,9 @@ public DiskBalancerService(OzoneContainer ozoneContainer, inProgressContainers = ConcurrentHashMap.newKeySet(); deltaSizes = new ConcurrentHashMap<>(); volumeSet = ozoneContainer.getVolumeSet(); - containerDefaultSize = (long) conf.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); try { - volumeChoosingPolicy = VolumeChoosingPolicyFactory.getDiskBalancerPolicy(conf); - containerChoosingPolicy = (ContainerChoosingPolicy) - conf.getObject(DiskBalancerConfiguration.class) - .getContainerChoosingPolicyClass().newInstance(); + volumeContainerChoosingPolicy = VolumeChoosingPolicyFactory.getDiskBalancerContainerPolicy(conf); } catch (Exception e) { LOG.error("Got exception when initializing DiskBalancerService", e); throw new IOException(e); @@ -403,25 +392,20 @@ public BackgroundTaskQueue getTasks() { } for (int i = 0; i < availableTaskCount; i++) { - Pair pair = volumeChoosingPolicy - .chooseVolume(volumeSet, threshold, deltaSizes, containerDefaultSize); - if (pair == null) { - continue; - } - HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight(); - ContainerData toBalanceContainer = containerChoosingPolicy - .chooseContainer(ozoneContainer, sourceVolume, destVolume, - inProgressContainers, threshold, volumeSet, deltaSizes); - if (toBalanceContainer != null) { - DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume, - destVolume); - queue.add(task); - inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID())); - deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L) - - toBalanceContainer.getBytesUsed()); - } else { - // release destVolume committed bytes - destVolume.incCommittedBytes(0 - containerDefaultSize); + DiskBalancerVolumeContainerCandidate candidate = volumeContainerChoosingPolicy.chooseVolumesAndContainer( + ozoneContainer, volumeSet, deltaSizes, inProgressContainers, threshold); + if (candidate != null) { + HddsVolume sourceVolume = candidate.getSourceVolume(); + HddsVolume destVolume = candidate.getDestVolume(); + ContainerData toBalanceContainer = candidate.getContainerData(); + if (toBalanceContainer != null) { + DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume, + destVolume); + queue.add(task); + inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID())); + deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L) + - toBalanceContainer.getBytesUsed()); + } } } @@ -501,7 +485,7 @@ public BackgroundTaskResult call() { // QUASI_CLOSED is allowed when test mode is enabled, this is done to test in production // these containers are rejected. State containerState = container.getContainerData().getState(); - boolean isTestMode = DefaultContainerChoosingPolicy.isTest(); + boolean isTestMode = DefaultVolumeContainerChoosingPolicy.isTest(); if (containerState != State.CLOSED && !(isTestMode && containerState == State.QUASI_CLOSED)) { LOG.warn("Container {} is in {} state, skipping move process. Only CLOSED containers can be moved.", containerId, containerState); @@ -638,7 +622,7 @@ private void postCall(boolean success, long startTime) { inProgressContainers.remove(ContainerID.valueOf(containerData.getContainerID())); deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) + containerData.getBytesUsed()); - destVolume.incCommittedBytes(0 - containerDefaultSize); + destVolume.incCommittedBytes(0 - containerData.getBytesUsed()); long endTime = Time.monotonicNow(); if (success) { metrics.incrSuccessCount(1); @@ -747,22 +731,13 @@ public void setBalancedBytesInLastWindow(long bytes) { this.balancedBytesInLastWindow.set(bytes); } - public ContainerChoosingPolicy getContainerChoosingPolicy() { - return containerChoosingPolicy; - } - - public DiskBalancerVolumeChoosingPolicy getVolumeChoosingPolicy() { - return volumeChoosingPolicy; - } - - @VisibleForTesting - public void setVolumeChoosingPolicy(DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy) { - this.volumeChoosingPolicy = volumeChoosingPolicy; + public VolumeContainerChoosingPolicy getVolumeContainerChoosingPolicy() { + return volumeContainerChoosingPolicy; } @VisibleForTesting - public void setContainerChoosingPolicy(ContainerChoosingPolicy containerChoosingPolicy) { - this.containerChoosingPolicy = containerChoosingPolicy; + public void setVolumeContainerChoosingPolicy(VolumeContainerChoosingPolicy volumeContainerChoosingPolicy) { + this.volumeContainerChoosingPolicy = volumeContainerChoosingPolicy; } @VisibleForTesting diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java deleted file mode 100644 index f964f6f519e3..000000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.diskbalancer.policy; - -import static java.util.concurrent.TimeUnit.HOURS; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.computeUtilization; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import org.apache.hadoop.hdds.fs.SpaceUsageSource; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; -import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Choose a container from specified volume, make sure it's not being balancing. - */ -public class DefaultContainerChoosingPolicy implements ContainerChoosingPolicy { - public static final Logger LOG = LoggerFactory.getLogger( - DefaultContainerChoosingPolicy.class); - - private static final ThreadLocal>>> CACHE = - ThreadLocal.withInitial( - () -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1, HOURS).build()); - - // for test - private static boolean test = false; - - @Override - public ContainerData chooseContainer(OzoneContainer ozoneContainer, - HddsVolume src, HddsVolume dst, - Set inProgressContainerIDs, - double thresholdPercentage, MutableVolumeSet volumeSet, - Map deltaMap) { - final Iterator> itr; - try { - itr = CACHE.get().get(src, () -> ozoneContainer.getController().getContainers(src)); - } catch (ExecutionException e) { - LOG.warn("Failed to get container iterator for volume {}", src, e); - return null; - } - - // Calculate the actual threshold - final List volumeUsages = getVolumeUsages(volumeSet, deltaMap); - final double actualThreshold = getIdealUsage(volumeUsages) + thresholdPercentage / 100.0; - - // Find container - final SpaceUsageSource.Fixed dstUsage = dst.getCurrentUsage(); - final long dstCommittedBytes = dst.getCommittedBytes(); - while (itr.hasNext()) { - ContainerData containerData = itr.next().getContainerData(); - if (containerData.getBytesUsed() > 0 && - !inProgressContainerIDs.contains(ContainerID.valueOf(containerData.getContainerID())) && - (containerData.isClosed() || (test && containerData.isQuasiClosed()))) { - - // Check if dst can accept the candidate container. - if (computeUtilization(dstUsage, dstCommittedBytes, containerData.getBytesUsed()) < actualThreshold) { - return containerData; - } - } - } - - CACHE.get().invalidate(src); - return null; - } - - @VisibleForTesting - public static void setTest(boolean isTest) { - test = isTest; - } - - @VisibleForTesting - public static boolean isTest() { - return test; - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java deleted file mode 100644 index 240c5a9f9f00..000000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.diskbalancer.policy; - -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.newVolumeFixedUsage; - -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hdds.fs.SpaceUsageSource; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; -import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Choose a random volume for disk balancing. - * - * Source volumes use deltaMap to simulate space that will be freed (pre-deleted). - * Destination volumes use committedBytes to account for space already reserved. - * Both deltaMap and committedBytes are considered to calculate usage. - */ -public class DefaultVolumeChoosingPolicy implements DiskBalancerVolumeChoosingPolicy { - - public static final Logger LOG = LoggerFactory.getLogger( - DefaultVolumeChoosingPolicy.class); - private final ReentrantLock lock; - - public DefaultVolumeChoosingPolicy(ReentrantLock globalLock) { - lock = globalLock; - } - - @Override - public Pair chooseVolume(MutableVolumeSet volumeSet, - double thresholdPercentage, Map deltaMap, long containerSize) { - lock.lock(); - try { - // Create truly immutable snapshot of volumes to ensure consistency - final List allVolumes = volumeSet.getVolumesList(); - if (allVolumes.size() < 2) { - return null; // Can't balance with less than 2 volumes. - } - - // Calculate usages and sort in ascending order of utilization - final List volumeUsages = allVolumes.stream() - .map(v -> newVolumeFixedUsage(v, deltaMap)) - .sorted(Comparator.comparingDouble(VolumeFixedUsage::getUtilization)) - .collect(Collectors.toList()); - - // Calculate ideal usage and threshold range - final double idealUsage = getIdealUsage(volumeUsages); - final double actualThreshold = thresholdPercentage / 100.0; - final double lowerThreshold = idealUsage - actualThreshold; - final double upperThreshold = idealUsage + actualThreshold; - - // Log all volume information for investigation - if (LOG.isDebugEnabled()) { - logVolumeBalancingState(volumeUsages, idealUsage, thresholdPercentage, - lowerThreshold, upperThreshold, containerSize, deltaMap); - } - - // Get highest and lowest utilization volumes - final VolumeFixedUsage highestUsage = volumeUsages.get(volumeUsages.size() - 1); - final VolumeFixedUsage lowestUsage = volumeUsages.get(0); - - // Only return null if highest is below upper threshold AND lowest is above lower threshold - // This means all volumes are strictly within the range (not at boundaries) - if (highestUsage.getUtilization() < upperThreshold && - lowestUsage.getUtilization() > lowerThreshold) { - // All volumes are strictly within threshold range, no balancing needed - return null; - } - - // Determine source volume: highest utilization volume (if above threshold) - final VolumeFixedUsage src = highestUsage; - - // Find destination volume: lowest utilization volume that has enough space - // Prefer volumes below threshold, but accept any volume with lower utilization than source - for (int i = 0; i < volumeUsages.size() - 1; i++) { - final VolumeFixedUsage dstUsage = volumeUsages.get(i); - final HddsVolume dst = dstUsage.getVolume(); - - // Check if destination has enough space and has lower utilization than source - if (dstUsage.getUtilization() < src.getUtilization() && - containerSize < dstUsage.computeUsableSpace()) { - // Found dst, reserve space and return - dst.incCommittedBytes(containerSize); - LOG.debug("Chosen volume pair for disk balancing: source={} (utilization={}), " + - "destination={} (utilization={})", - src.getVolume().getStorageID(), src.getUtilization(), - dst.getStorageID(), dstUsage.getUtilization()); - return Pair.of(src.getVolume(), dst); - } - LOG.debug("Destination volume {} does not have enough space, trying next volume.", - dst.getStorageID()); - } - LOG.debug("Failed to find appropriate destination volume."); - return null; - } finally { - lock.unlock(); - } - } - - /** - * Logs all volume information for disk balancing investigation. - * - * @param volumeUsages List of volume usages (sorted by utilization ascending) - * @param idealUsage Calculated ideal usage - * @param thresholdPercentage Threshold percentage - * @param lowerThreshold Lower threshold bound - * @param upperThreshold Upper threshold bound - * @param containerSize Container size to be moved - * @param deltaMap Map of volume deltas - */ - private void logVolumeBalancingState(List volumeUsages, - double idealUsage, double thresholdPercentage, double lowerThreshold, - double upperThreshold, long containerSize, Map deltaMap) { - LOG.debug("Disk balancing state - idealUsage={}, thresholdPercentage={}%, " + - "thresholdRange=({}, {}), containerSize={}", - String.format("%.10f", idealUsage), thresholdPercentage, - String.format("%.10f", lowerThreshold), String.format("%.10f", upperThreshold), - containerSize); - for (int i = 0; i < volumeUsages.size(); i++) { - VolumeFixedUsage vfu = volumeUsages.get(i); - HddsVolume vol = vfu.getVolume(); - SpaceUsageSource.Fixed usage = vfu.getUsage(); - long usableSpace = vfu.computeUsableSpace(); - LOG.debug("Volume[{}] - disk={}, utilization={}, capacity={}, " + - "effectiveUsed={}, available={}, usableSpace={}, committedBytes={}, delta={}", - i, vol.getStorageID(), String.format("%.10f", vfu.getUtilization()), - usage.getCapacity(), vfu.getEffectiveUsed(), usage.getAvailable(), - usableSpace, vol.getCommittedBytes(), deltaMap.getOrDefault(vol, 0L)); - } - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeContainerChoosingPolicy.java new file mode 100644 index 000000000000..ea384b2c9e7f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeContainerChoosingPolicy.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer.policy; + +import static java.util.concurrent.TimeUnit.HOURS; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.computeUtilization; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.newVolumeFixedUsage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.fs.SpaceUsageSource; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * First chooses a source volume and destination volume pair based on ideal utilization and threshold, + * then chooses a container from the source volume that can be moved to the destination without + * exceeding the upper threshold. Space is reserved on the destination only when a container is + * chosen, using the actual container size. + */ +public class DefaultVolumeContainerChoosingPolicy implements VolumeContainerChoosingPolicy { + public static final Logger LOG = LoggerFactory.getLogger( + DefaultVolumeContainerChoosingPolicy.class); + + private static final ThreadLocal>>> CACHE = + ThreadLocal.withInitial( + () -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1, HOURS).build()); + + // for test + private static boolean test = false; + + private final ReentrantLock lock; + + public DefaultVolumeContainerChoosingPolicy(ReentrantLock globalLock) { + this.lock = globalLock; + } + + @Override + public DiskBalancerVolumeContainerCandidate chooseVolumesAndContainer(OzoneContainer ozoneContainer, + MutableVolumeSet volumeSet, Map deltaMap, Set inProgressContainerIDs, + double thresholdPercentage) { + lock.lock(); + try { + // Create truly immutable snapshot of volumes to ensure consistency + final List allVolumes = volumeSet.getVolumesList(); + if (allVolumes.size() < 2) { + return null; // Can't balance with less than 2 volumes. + } + + // Calculate usages and sort in ascending order of utilization (once) + // Use storage ID as secondary sort for deterministic ordering when utilizations are equal + final List volumeUsages = allVolumes.stream() + .map(v -> newVolumeFixedUsage(v, deltaMap)) + .sorted(Comparator.comparingDouble(VolumeFixedUsage::getUtilization) + .thenComparing(v -> v.getVolume().getStorageID())) + .collect(Collectors.toList()); + + // Calculate ideal usage and threshold range (once) + final double idealUsage = getIdealUsage(volumeUsages); + final double actualThreshold = thresholdPercentage / 100.0; + final double lowerThreshold = idealUsage - actualThreshold; + final double upperThreshold = idealUsage + actualThreshold; + + if (LOG.isDebugEnabled()) { + logVolumeBalancingState(volumeUsages, idealUsage, thresholdPercentage, + lowerThreshold, upperThreshold, deltaMap); + } + + // Get highest and lowest utilization volumes + final VolumeFixedUsage highestUsage = volumeUsages.get(volumeUsages.size() - 1); + final VolumeFixedUsage lowestUsage = volumeUsages.get(0); + + // Only return null if highest is below upper threshold AND lowest is above lower threshold + if (highestUsage.getUtilization() < upperThreshold && + lowestUsage.getUtilization() > lowerThreshold) { + return null; + } + + // Determine source volume: highest utilization volume + final VolumeFixedUsage srcUsage = highestUsage; + final HddsVolume src = srcUsage.getVolume(); + + // Find destination volume and container: try each dest with lower utilization than source + for (int i = 0; i < volumeUsages.size() - 1; i++) { + final VolumeFixedUsage dstUsage = volumeUsages.get(i); + final HddsVolume dst = dstUsage.getVolume(); + + // Check if destination has lower utilization than source and some usable space + if (dstUsage.getUtilization() < srcUsage.getUtilization() && + dstUsage.computeUsableSpace() > 0) { + ContainerData containerData = chooseContainer(ozoneContainer, + src, dst, dstUsage, inProgressContainerIDs, upperThreshold); + if (containerData != null) { + long containerSize = containerData.getBytesUsed(); + dst.incCommittedBytes(containerSize); + LOG.debug("Chosen volume pair for disk balancing: source={} (utilization={}), " + + "destination={} (utilization={})", + src.getStorageID(), srcUsage.getUtilization(), + dst.getStorageID(), dstUsage.getUtilization()); + return new DiskBalancerVolumeContainerCandidate(containerData, src, dst); + } + LOG.debug("No suitable container found for destination {}, trying next volume.", + dst.getStorageID()); + } else { + LOG.debug("Destination volume {} does not have enough space, trying next volume.", + dst.getStorageID()); + } + } + LOG.debug("Failed to find appropriate destination volume and container."); + return null; + } finally { + lock.unlock(); + } + } + + /** + * Choose a container from source volume that can be moved to destination + * without exceeding the upper threshold. Checks both space and utilization. + *

+ * This method is public to allow direct testing of container selection logic + * in integration tests without requiring full volume selection setup. + */ + public ContainerData chooseContainer(OzoneContainer ozoneContainer, + HddsVolume src, HddsVolume dst, VolumeFixedUsage dstUsage, + Set inProgressContainerIDs, double upperThreshold) { + final Iterator> itr; + try { + itr = CACHE.get().get(src, () -> ozoneContainer.getController().getContainers(src)); + } catch (ExecutionException e) { + LOG.warn("Failed to get container iterator for volume {}", src, e); + return null; + } + + final SpaceUsageSource.Fixed dstSpaceUsage = dstUsage.getUsage(); + final long dstCommittedBytes = dst.getCommittedBytes(); + final long usableSpace = dstUsage.computeUsableSpace(); + + while (itr.hasNext()) { + ContainerData containerData = itr.next().getContainerData(); + + // Skip containers removed from containerSet after iterator was cached + if (ozoneContainer.getContainerSet().getContainer(containerData.getContainerID()) == null) { + continue; + } + + if (containerData.getBytesUsed() > 0 && + !inProgressContainerIDs.contains(ContainerID.valueOf(containerData.getContainerID())) && + (containerData.isClosed() || (test && containerData.isQuasiClosed()))) { + + long containerSize = containerData.getBytesUsed(); + // Check if dst has enough space and can accept the container without exceeding threshold + if (containerSize <= usableSpace && + computeUtilization(dstSpaceUsage, dstCommittedBytes, containerSize) < upperThreshold) { + return containerData; + } + } + } + + CACHE.get().invalidate(src); + return null; + } + + /** + * Logs all volume information for disk balancing investigation. + */ + private void logVolumeBalancingState(List volumeUsages, + double idealUsage, double thresholdPercentage, double lowerThreshold, + double upperThreshold, Map deltaMap) { + LOG.debug("Disk balancing state - idealUsage={}, thresholdPercentage={}%, " + + "thresholdRange=({}, {})", + String.format("%.10f", idealUsage), thresholdPercentage, + String.format("%.10f", lowerThreshold), String.format("%.10f", upperThreshold)); + for (int i = 0; i < volumeUsages.size(); i++) { + VolumeFixedUsage vfu = volumeUsages.get(i); + HddsVolume vol = vfu.getVolume(); + SpaceUsageSource.Fixed usage = vfu.getUsage(); + long usableSpace = vfu.computeUsableSpace(); + LOG.debug("Volume[{}] - disk={}, utilization={}, capacity={}, " + + "effectiveUsed={}, available={}, usableSpace={}, committedBytes={}, delta={}", + i, vol.getStorageID(), String.format("%.10f", vfu.getUtilization()), + usage.getCapacity(), vfu.getEffectiveUsed(), usage.getAvailable(), + usableSpace, vol.getCommittedBytes(), deltaMap.getOrDefault(vol, 0L)); + } + } + + @VisibleForTesting + public static void setTest(boolean isTest) { + test = isTest; + } + + @VisibleForTesting + public static boolean isTest() { + return test; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeContainerCandidate.java similarity index 52% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeContainerCandidate.java index c64733fc37ea..9e556e6934bb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeContainerCandidate.java @@ -17,24 +17,34 @@ package org.apache.hadoop.ozone.container.diskbalancer.policy; -import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; /** - * This interface specifies the policy for choosing volumes to balance. + * Result of consolidated volume and container selection for disk balancing. + * Contains the container to move and its source and destination volumes. */ -public interface DiskBalancerVolumeChoosingPolicy { - /** - * Choose a pair of volumes for balancing. - * - * @param volumeSet - volumes to choose from. - * @param thresholdPercentage the threshold percentage in range (0, 100) to choose the source volume. - * @param deltaSizes - the sizes changes of inProgress balancing jobs. - * @param containerSize - the estimated size of container to be moved. - * @return Source volume and Dest volume. - */ - Pair chooseVolume(MutableVolumeSet volumeSet, - double thresholdPercentage, Map deltaSizes, long containerSize); +public final class DiskBalancerVolumeContainerCandidate { + private final ContainerData containerData; + private final HddsVolume sourceVolume; + private final HddsVolume destVolume; + + public DiskBalancerVolumeContainerCandidate(ContainerData containerData, + HddsVolume sourceVolume, HddsVolume destVolume) { + this.containerData = containerData; + this.sourceVolume = sourceVolume; + this.destVolume = destVolume; + } + + public ContainerData getContainerData() { + return containerData; + } + + public HddsVolume getSourceVolume() { + return sourceVolume; + } + + public HddsVolume getDestVolume() { + return destVolume; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/VolumeContainerChoosingPolicy.java similarity index 55% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/VolumeContainerChoosingPolicy.java index d512de84bdf4..9a9e7d99102d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/VolumeContainerChoosingPolicy.java @@ -20,30 +20,33 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; /** - * This interface specifies the policy for choosing containers to balance. + * This interface specifies the policy for choosing volumes and containers to balance. + * It provides consolidated volume selection (source/destination pair) and container selection + * into a single operation to avoid recalculating ideal utilization and disk usage. */ -public interface ContainerChoosingPolicy { +public interface VolumeContainerChoosingPolicy { /** - * Choose a container for balancing. - * @param ozoneContainer the OzoneContainer instance to get all containers of a particular volume. - * @param srcVolume the HddsVolume instance to choose containers from. - * @param destVolume the destination volume to which container is being moved. - * @param inProgressContainerIDs containerIDs present in this set should be - - avoided as these containers are already under move by diskBalancer. - * @param thresholdPercentage the threshold percentage in range (0, 100) + * Choose a container and its source/destination volumes for balancing. + * Performs both volume pair selection and container selection in one call, + * computing ideal usage and volume utilizations only once. + * Space is reserved on the destination only when a container is chosen, + * using the actual container size. + * + * @param ozoneContainer the OzoneContainer instance to get all containers * @param volumeSet the volumeSet instance - * @param deltaMap the deltaMap instance of source volume - * @return a Container + * @param deltaMap the deltaMap for in-progress balancing jobs (negative = space to be freed) + * @param inProgressContainerIDs containerIDs to avoid (already under move) + * @param thresholdPercentage the threshold percentage in range (0, 100) + * @return a DiskBalancerVolumeContainerCandidate with container and volumes, or null if none found */ - ContainerData chooseContainer(OzoneContainer ozoneContainer, - HddsVolume srcVolume, HddsVolume destVolume, + DiskBalancerVolumeContainerCandidate chooseVolumesAndContainer(OzoneContainer ozoneContainer, + MutableVolumeSet volumeSet, + Map deltaMap, Set inProgressContainerIDs, - double thresholdPercentage, MutableVolumeSet volumeSet, - Map deltaMap); + double thresholdPercentage); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java similarity index 82% rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultContainerChoosingPolicy.java rename to hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java index 67fa7dac39ed..73587c4bd617 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultContainerChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; @@ -45,14 +46,13 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -64,7 +64,7 @@ /** * Unit tests for the DefaultContainerChoosingPolicy. */ -public class TestDefaultContainerChoosingPolicy { +public class TestDiskBalancerContainerChoosingLogic { @TempDir private Path baseDir; @@ -74,7 +74,7 @@ public class TestDefaultContainerChoosingPolicy { private static final long VOLUME_CAPACITY = 2500L * MB; // 2500MB private static final double THRESHOLD = 10.0; - private ContainerChoosingPolicy policy; + private DefaultVolumeContainerChoosingPolicy policy; private OzoneContainer ozoneContainer; private MutableVolumeSet volumeSet; private ContainerSet containerSet; @@ -86,7 +86,7 @@ public class TestDefaultContainerChoosingPolicy { @BeforeEach public void setup() throws Exception { - policy = new DefaultContainerChoosingPolicy(); + policy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); setupVolumesAndContainer(); inProgressContainerIDs = new HashSet<>(); deltaMap = new HashMap<>(); @@ -174,29 +174,35 @@ public void testContainerChosenSuccessfully() { // - C1 (500MB) is productive: (250+500)/2500 = 30% <= 56.67% // The policy iterates by container ID, so it will find and return C1. - ContainerData chosenContainer = policy.chooseContainer(ozoneContainer, - sourceVolume, destVolume1, inProgressContainerIDs, THRESHOLD, volumeSet, deltaMap); + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, THRESHOLD); - // first container should be chosen - assertNotNull(chosenContainer); - assertEquals(1L, chosenContainer.getContainerID()); + // first contianer should be chosen + assertNotNull(candidate); + assertEquals(1L, candidate.getContainerData().getContainerID()); } @Test - public void testContainerNotChosen() { - // For destVolume2, no container move should be productive. - // Max Allowed Utilization is ~56.67%. - // - // Evaluation for destVolume2 (50% used / 1250MB): - // Max productive size = (0.5667 * 2500) - 1250 = 166.75MB - // All containers on the source volume (smallest is 200MB) are larger - // than 166.75MB. Therefore, no container should be chosen. - - ContainerData chosenContainer = policy.chooseContainer(ozoneContainer, - sourceVolume, destVolume2, inProgressContainerIDs, THRESHOLD, volumeSet, deltaMap); + public void testContainerNotChosen() throws IOException { + // Test scenario where no container fits the destination volume. + // Block dest1 (lowest utilization) and mark small containers as in-progress. + // Only large containers remain (500MB, 450MB), but dest2 can accept max 166.75MB. + + // Block dest1 by marking all available space as committed + destVolume1.incCommittedBytes(destVolume1.getCurrentUsage().getAvailable()); + + // Mark small containers as in-progress (unavailable) + inProgressContainerIDs.add(ContainerID.valueOf(3L)); // 200MB + inProgressContainerIDs.add(ContainerID.valueOf(4L)); // 350MB + + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, THRESHOLD); - // No containers should not be chosen - assertNull(chosenContainer); + assertNull(candidate); + + // Cleanup + destVolume1.incCommittedBytes(-destVolume1.getCurrentUsage().getAvailable()); + inProgressContainerIDs.clear(); } @Test @@ -217,13 +223,13 @@ public void testSizeZeroContainersSkipped() throws IOException { when(testOzoneContainer.getController()).thenReturn(testController); // The policy should skip containers 10 and 11 (size 0) and choose container 12 - ContainerData chosenContainer = policy.chooseContainer(testOzoneContainer, - sourceVolume, destVolume1, inProgressContainerIDs, THRESHOLD, volumeSet, deltaMap); - + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(testOzoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, THRESHOLD); + // Container 12 (non-zero size) should be chosen, skipping containers 10 and 11 (size 0) - assertNotNull(chosenContainer); - assertEquals(12L, chosenContainer.getContainerID()); - assertEquals(200L * MB, chosenContainer.getBytesUsed()); + assertNotNull(candidate); + assertEquals(12L, candidate.getContainerData().getContainerID()); + assertEquals(200L * MB, candidate.getContainerData().getBytesUsed()); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java index fc1980e579ee..c81bd06a8121 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java @@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,7 +38,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -56,10 +54,9 @@ import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; @@ -210,10 +207,8 @@ public void testPolicyClassInitialization(ContainerTestVersionInfo versionInfo) DiskBalancerServiceTestImpl svc = getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1); - assertTrue(svc.getContainerChoosingPolicy() - instanceof DefaultContainerChoosingPolicy); - assertTrue(svc.getVolumeChoosingPolicy() - instanceof DefaultVolumeChoosingPolicy); + assertTrue(svc.getVolumeContainerChoosingPolicy() + instanceof DefaultVolumeContainerChoosingPolicy); } private String generateVolumeLocation(String base, int volumeCount) { @@ -321,10 +316,8 @@ public void testConcurrentTasksNotExceedThreadLimit() throws Exception { false, DiskBalancerVersion.DEFAULT_VERSION); svc.refresh(info); - DiskBalancerVolumeChoosingPolicy volumePolicy = mock(DiskBalancerVolumeChoosingPolicy.class); - ContainerChoosingPolicy containerPolicy = mock(ContainerChoosingPolicy.class); - svc.setVolumeChoosingPolicy(volumePolicy); - svc.setContainerChoosingPolicy(containerPolicy); + VolumeContainerChoosingPolicy containerPolicy = mock(VolumeContainerChoosingPolicy.class); + svc.setVolumeContainerChoosingPolicy(containerPolicy); List volumes = volumeSet.getVolumesList(); HddsVolume source = (HddsVolume) volumes.get(0); @@ -335,9 +328,8 @@ public void testConcurrentTasksNotExceedThreadLimit() throws Exception { when(containerData.getContainerID()).thenAnswer(invocation -> System.nanoTime()); when(containerData.getBytesUsed()).thenReturn(100L); - when(volumePolicy.chooseVolume(any(), anyDouble(), any(), anyLong())).thenReturn(Pair.of(source, dest)); - when(containerPolicy.chooseContainer(any(), any(), any(), any(), anyDouble(), any(), any())) - .thenReturn(containerData); + when(containerPolicy.chooseVolumesAndContainer(any(), any(), any(), any(), anyDouble())) + .thenReturn(new DiskBalancerVolumeContainerCandidate(containerData, source, dest)); // Test when no tasks are in progress, it should schedule up to the limit BackgroundTaskQueue queue = svc.getTasks(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java index af9efdf75049..85d0bf9219c1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java @@ -51,7 +51,6 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -72,7 +71,7 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -267,7 +266,7 @@ public void cleanup() throws IOException { kvFaultInjector.reset(); KeyValueContainer.setInjector(null); DiskBalancerService.setInjector(null); - DefaultContainerChoosingPolicy.setTest(false); + DefaultVolumeContainerChoosingPolicy.setTest(false); } @ParameterizedTest @@ -291,7 +290,7 @@ public void moveSuccess(State containerState) throws IOException { String oldContainerPath = container.getContainerData().getContainerPath(); if (containerState == State.QUASI_CLOSED) { - DefaultContainerChoosingPolicy.setTest(true); + DefaultVolumeContainerChoosingPolicy.setTest(true); } DiskBalancerService.DiskBalancerTask task = getTask(); task.call(); @@ -460,7 +459,7 @@ public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo versionInfo) .when(spyContainerSet).updateContainer(any(Container.class)); when(ozoneContainer.getContainerSet()).thenReturn(spyContainerSet); - DefaultContainerChoosingPolicy.setTest(true); + DefaultVolumeContainerChoosingPolicy.setTest(true); DiskBalancerService.DiskBalancerTask task = getTask(); CompletableFuture completableFuture = CompletableFuture.runAsync(() -> task.call()); @@ -564,11 +563,8 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio GenericTestUtils.LogCapturer serviceLog = GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class); DiskBalancerService.DiskBalancerTask task = getTask(); - long defaultContainerSize = (long) conf.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - // verify committed space is reserved for destination volume - assertEquals(defaultContainerSize, destVolume.getCommittedBytes() - initialDestCommitted); + // verify committed space is reserved for destination volume (uses actual container size) + assertEquals(CONTAINER_SIZE, destVolume.getCommittedBytes() - initialDestCommitted); // delete the container from containerSet to simulate a failure containerSet.removeContainer(CONTAINER_ID); @@ -626,7 +622,7 @@ public void testMoveSkippedWhenContainerStateChanged(State invalidState) throws IOException, InterruptedException, TimeoutException { LogCapturer serviceLog = LogCapturer.captureLogs(DiskBalancerService.class); - // Create a CLOSED container which will be selected by DefaultContainerChoosingPolicy + // Create a CLOSED container which will be selected by DefaultVolumeContainerChoosingPolicy Container container = createContainer(CONTAINER_ID, sourceVolume, State.CLOSED); long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java similarity index 78% rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultVolumeChoosingPolicy.java rename to hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java index ff47a8281ecb..7ef3f0cd63a4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java @@ -18,11 +18,14 @@ package org.apache.hadoop.ozone.container.diskbalancer; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; +import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.file.Path; @@ -31,23 +34,36 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.MockSpaceUsageSource; import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.SpaceUsagePersistence; import org.apache.hadoop.hdds.fs.SpaceUsageSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -55,26 +71,31 @@ import org.junit.jupiter.params.provider.MethodSource; /** - * Unit tests for DefaultVolumeChoosingPolicy. + * Unit tests for volume and container selection in DefaultVolumeContainerChoosingPolicy. */ -public class TestDefaultVolumeChoosingPolicy { +public class TestDiskBalancerVolumeChoosingLogic { @TempDir private Path baseDir; + private static final OzoneConfiguration CONF = new OzoneConfiguration(); private static final long MB = 1024L * 1024L; private static final long VOLUME_CAPACITY = 2500L * MB; // 2500MB - same for all volumes private static final long DEFAULT_CONTAINER_SIZE = 100L * MB; // 100MB - private DefaultVolumeChoosingPolicy policy; + private VolumeContainerChoosingPolicy policy; private MutableVolumeSet volumeSet; private String datanodeUuid; private Map deltaMap; + private OzoneContainer ozoneContainer; + private ContainerSet containerSet; + private Set inProgressContainerIDs; @BeforeEach public void setup() { datanodeUuid = UUID.randomUUID().toString(); - policy = new DefaultVolumeChoosingPolicy(new ReentrantLock()); + policy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); deltaMap = new HashMap<>(); + inProgressContainerIDs = new HashSet<>(); } /** @@ -229,12 +250,15 @@ private List createVolumes(List configs) } /** - * Sets up volume set with given volumes. + * Sets up volume set with given volumes and OzoneContainer. + * For "should find pair" scenarios, adds a container on the source volume. * * @param volumes List of volumes to add to volume set + * @param sourceVolume The highest utilization volume (source) - add container here if non-null + * @param containerSize The size of the container to create on the source volume */ - private void setupVolumeSet(List volumes) throws IOException { - // Use a clean configuration to avoid loading default volumes + private void setupVolumeSetAndContainers(List volumes, + HddsVolume sourceVolume, long containerSize) throws IOException { OzoneConfiguration testConf = new OzoneConfiguration(); testConf.set(HDDS_DATANODE_DIR_KEY, baseDir.resolve("defaultVolume").toString()); volumeSet = new MutableVolumeSet(datanodeUuid, testConf, null, @@ -247,6 +271,30 @@ private void setupVolumeSet(List volumes) throws IOException { volumeMap.put(volume.getStorageDir().getAbsolutePath(), volume); } volumeSet.setVolumeMapForTesting(volumeMap); + + containerSet = newContainerSet(); + if (sourceVolume != null) { + createContainer(1L, containerSize, sourceVolume); + } + + ozoneContainer = mock(OzoneContainer.class); + ContainerController controller = new ContainerController(containerSet, null); + when(ozoneContainer.getController()).thenReturn(controller); + } + + private void createContainer(long id, long usedBytes, HddsVolume vol) + throws IOException { + long maxSize = usedBytes > 0 ? usedBytes : (long) CONF.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + KeyValueContainerData containerData = new KeyValueContainerData(id, + ContainerLayoutVersion.FILE_PER_BLOCK, maxSize, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + containerData.setState(ContainerDataProto.State.CLOSED); + containerData.setVolume(vol); + containerData.getStatistics().setBlockBytesForTesting(usedBytes); + KeyValueContainer container = new KeyValueContainer(containerData, CONF); + containerSet.addContainer(container); } /** @@ -260,7 +308,13 @@ public void testVolumeChoosingPolicy(TestScenario scenario) throws IOException { // Create volumes from configuration List volumes = createVolumes(scenario.getVolumes()); - setupVolumeSet(volumes); + // Source is the highest utilization volume (last in sorted order) + List sortedUsages = getVolumeUsages( + createVolumeSetForUsages(volumes), deltaMap); + sortedUsages.sort(Comparator.comparingDouble(VolumeFixedUsage::getUtilization)); + HddsVolume sourceVolume = scenario.shouldFindPair() + ? sortedUsages.get(sortedUsages.size() - 1).getVolume() : null; + setupVolumeSetAndContainers(volumes, sourceVolume, scenario.getContainerSize()); // Create a map of disk names to volumes for verification Map diskNameToVolume = new HashMap<>(); @@ -272,27 +326,27 @@ public void testVolumeChoosingPolicy(TestScenario scenario) // Get volume usages for verification List volumeUsages = getVolumeUsages(volumeSet, deltaMap); - // Try to find a valid source-destination pair - Pair result = policy.chooseVolume(volumeSet, - scenario.getThresholdPercentage(), deltaMap, scenario.getContainerSize()); + DiskBalancerVolumeContainerCandidate result = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, scenario.getThresholdPercentage()); if (scenario.shouldFindPair()) { assertNotNull(result); - assertNotNull(result.getLeft()); - assertNotNull(result.getRight()); + assertNotNull(result.getSourceVolume()); + assertNotNull(result.getDestVolume()); + assertNotNull(result.getContainerData()); // Verify source is the expected disk if (scenario.getExpectedSourceDisk() != null) { HddsVolume expectedSource = diskNameToVolume.get(scenario.getExpectedSourceDisk()); assertNotNull(expectedSource); - assertEquals(expectedSource, result.getLeft()); + assertEquals(expectedSource, result.getSourceVolume()); } // Verify destination is the expected disk (or one of the valid options) if (scenario.getExpectedDestinationDisk() != null) { HddsVolume expectedDest = diskNameToVolume.get(scenario.getExpectedDestinationDisk()); assertNotNull(expectedDest); - assertEquals(expectedDest, result.getRight()); + assertEquals(expectedDest, result.getDestVolume()); } // Filter volumeUsages to only include volumes from our test scenario @@ -313,10 +367,10 @@ public void testVolumeChoosingPolicy(TestScenario scenario) int sourceIndex = -1; int destIndex = -1; for (int i = 0; i < testVolumeUsages.size(); i++) { - if (testVolumeUsages.get(i).getVolume().equals(result.getLeft())) { + if (testVolumeUsages.get(i).getVolume().equals(result.getSourceVolume())) { sourceIndex = i; } - if (testVolumeUsages.get(i).getVolume().equals(result.getRight())) { + if (testVolumeUsages.get(i).getVolume().equals(result.getDestVolume())) { destIndex = i; } } @@ -331,8 +385,7 @@ public void testVolumeChoosingPolicy(TestScenario scenario) double sourceUtilization = testVolumeUsages.get(sourceIndex).getUtilization(); for (int i = 0; i < testVolumeUsages.size(); i++) { if (i != sourceIndex) { - double otherUtilization = testVolumeUsages.get(i).getUtilization(); - assertTrue(sourceUtilization >= otherUtilization); + assertTrue(sourceUtilization >= testVolumeUsages.get(i).getUtilization()); } } } @@ -361,6 +414,19 @@ public void testVolumeChoosingPolicy(TestScenario scenario) } } + private MutableVolumeSet createVolumeSetForUsages(List volumes) throws IOException { + OzoneConfiguration testConf = new OzoneConfiguration(); + testConf.set(HDDS_DATANODE_DIR_KEY, baseDir.resolve("defaultVolume").toString()); + MutableVolumeSet vs = new MutableVolumeSet(datanodeUuid, testConf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + Map volumeMap = new HashMap<>(); + for (HddsVolume volume : volumes) { + volumeMap.put(volume.getStorageDir().getAbsolutePath(), volume); + } + vs.setVolumeMapForTesting(volumeMap); + return vs; + } + /** * Provides test scenarios for parameterized testing. */ @@ -368,23 +434,23 @@ public void testVolumeChoosingPolicy(TestScenario scenario) public static Stream testScenarios() { return Stream.of( // Scenario 1: One volume beyond threshold, no volumes under threshold - // Disk1: 30%, Disk2: 30%, Disk3: 40%, Threshold: 5% - // Ideal: 33.33%, Range: (28.33%, 38.33%), Out of range: Disk3 - // Expected source: Disk3 (highest) at index 2, Expected destination: Disk1 or Disk2 (lowest) at index 0 or 1 + // Disk1: 30%, Disk2: 30.1%, Disk3: 40%, Threshold: 5% + // Ideal: 33.37%, Range: (28.37%, 38.37%), Out of range: Disk3 + // Expected source: Disk3 (highest) at index 2, Expected destination: Disk1 (lowest) at index 0 Arguments.arguments(new TestScenario( "OneVolumeBeyondThresholdNoVolumesUnderThreshold", Arrays.asList( new VolumeTestConfig("disk1", 0.30), // Lowest utilization - index 0 - new VolumeTestConfig("disk2", 0.30), // Same as disk1 - index 1 + new VolumeTestConfig("disk2", 0.301), // Slightly higher - index 1 new VolumeTestConfig("disk3", 0.40) // Highest utilization - index 2 ), 5.0, DEFAULT_CONTAINER_SIZE, true, "disk3", // Expected source (highest) - null, // Destination can be disk1 or disk2 (both valid) so only check source + "disk1", // Destination is disk1 (lowest utilization) 2, // Expected source index (highest utilization) - null // Destination index can be 0 or 1 (both have same utilization) + 0 // Destination index is 0 (lowest utilization) )), // Scenario 2: Volumes both above and below threshold @@ -415,7 +481,7 @@ public static Stream testScenarios() { "AllVolumesWithinThreshold", Arrays.asList( new VolumeTestConfig("disk1", 0.30), // Lowest utilization - new VolumeTestConfig("disk2", 0.30), // Same as disk1 + new VolumeTestConfig("disk2", 0.301), // Slightly higher new VolumeTestConfig("disk3", 0.33) // Highest utilization ), 10.0, @@ -428,43 +494,43 @@ public static Stream testScenarios() { )), // Scenario 4: One volume under threshold, no volumes above threshold - // Disk1: 30%, Disk2: 30%, Disk3: 20% - // Ideal: 26.67%, Range: (21.67%, 31.67%), Out of range: Disk3 - // Expected source: Disk1 or Disk2 (highest) at index 1 or 2, Expected destination: Disk3 (lowest) at index 0 + // Disk1: 30%, Disk2: 30.1%, Disk3: 20% + // Ideal: 26.70%, Range: (21.70%, 31.70%), Out of range: Disk3 + // Expected source: Disk2 (highest) at index 2, Expected destination: Disk3 (lowest) at index 0 Arguments.arguments(new TestScenario( "OneVolumeUnderThresholdNoVolumesAbove", Arrays.asList( new VolumeTestConfig("disk3", 0.20), // Lowest utilization - index 0 new VolumeTestConfig("disk1", 0.30), // Middle utilization - index 1 - new VolumeTestConfig("disk2", 0.30) // Highest utilization (tied with disk1) - index 2 + new VolumeTestConfig("disk2", 0.301) // Highest utilization - index 2 ), 5.0, DEFAULT_CONTAINER_SIZE, true, - null, // Source can be disk1 or disk2 (both valid, highest) + "disk2", // Source is disk2 (highest) "disk3", // Expected destination (lowest) - null, // Source index can be 1 or 2 (both have same utilization) + 2, // Source index is 2 (highest utilization) 0 // Expected destination index (lowest utilization) )), // Scenario 5: Extreme imbalance - one very high, others very low - // Disk1: 95%, Disk2: 5%, Disk3: 5%, Threshold: 10% - // Ideal: 35%, Range: (25%, 45%), Out of range: Disk1, Disk2, Disk3 - // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 or Disk3 (lowest) at index 0 or 1 + // Disk1: 95%, Disk2: 5%, Disk3: 5.1%, Threshold: 10% + // Ideal: 35.03%, Range: (25.03%, 45.03%), Out of range: Disk1, Disk2, Disk3 + // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 (lowest) at index 0 Arguments.arguments(new TestScenario( "ExtremeImbalance", Arrays.asList( new VolumeTestConfig("disk2", 0.05), // Lowest utilization - index 0 - new VolumeTestConfig("disk3", 0.05), // Same as disk2 - index 1 + new VolumeTestConfig("disk3", 0.051), // Slightly higher - index 1 new VolumeTestConfig("disk1", 0.95) // Highest utilization - index 2 ), 10.0, DEFAULT_CONTAINER_SIZE, true, "disk1", // Expected source (highest) - null, // Destination can be disk2 or disk3 (lowest) + "disk2", // Destination is disk2 (lowest) 2, // Expected source index (highest utilization) - null // Destination index can be 0 or 1 (both have same utilization) + 0 // Destination index is 0 (lowest utilization) )), // Scenario 6: Multiple volumes above threshold, one below @@ -508,23 +574,25 @@ public static Stream testScenarios() { )), // Scenario 8: Small threshold with moderate imbalance - // Disk1: 35%, Disk2: 30%, Disk3: 30%, Threshold: 2% - // Ideal: 31.67%, Range: (29.67%, 33.67%), Out of range: Disk1 - // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 or Disk3 (lowest) at index 0 or 1 + // Disk1: 35%, Disk2: 30%, Disk3: 30.1%, Threshold: 2% + // Ideal: 31.70%, Range: (29.70%, 33.70%), Out of range: Disk1 + // Container size must be small enough to fit on disk2 without exceeding threshold + // disk2 after: (750MB + 50MB) / 2500MB = 32% < 33.7% threshold ✓ + // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 (lowest) at index 0 Arguments.arguments(new TestScenario( "SmallThresholdModerateImbalance", Arrays.asList( new VolumeTestConfig("disk2", 0.30), // Lowest utilization - index 0 - new VolumeTestConfig("disk3", 0.30), // Same as disk2 - index 1 + new VolumeTestConfig("disk3", 0.301), // Slightly higher than disk2 - index 1 new VolumeTestConfig("disk1", 0.35) // Highest utilization - index 2 ), 2.0, - DEFAULT_CONTAINER_SIZE, + 50L * MB, // Small container to fit within tight threshold true, "disk1", // Expected source (highest) - null, // Destination can be disk2 or disk3 (lowest) + "disk2", // Destination is disk2 (lowest) 2, // Expected source index (highest utilization) - null // Destination index can be 0 or 1 (both have same utilization) + 0 // Destination index is 0 (lowest utilization) )), // Scenario 9: Best destination has low utilization but insufficient space diff --git a/hadoop-hdds/docs/content/design/diskbalancer.md b/hadoop-hdds/docs/content/design/diskbalancer.md index f546b5253d7a..c6a9b458f845 100644 --- a/hadoop-hdds/docs/content/design/diskbalancer.md +++ b/hadoop-hdds/docs/content/design/diskbalancer.md @@ -108,16 +108,11 @@ D2 ----> Temp C1-CLOSED --- (2) ---> Temp C1-RECOVERING --- (3) ---> C1-RE By default, the DiskBalancer uses specific policies to decide which disks to balance and which containers to move. These are configurable, but the default implementations provide robust and safe behavior. -* **`DefaultVolumeChoosingPolicy`**: This is the default policy for selecting the source and destination volumes. It -identifies the most over-utilized volume as the source and the most under-utilized volume as the destination by comparing -each volume's utilization against the Datanode's average. The calculation is smart enough to account for data that is -already in the process of being moved, ensuring it makes accurate decisions based on the future state of the volumes. - -* **`DefaultContainerChoosingPolicy`**: This is the default policy for selecting which container to move from a source -volume. It iterates through the containers on the source disk and picks the first one that is in a **CLOSED** state -and is not already being moved by another balancing operation. To optimize performance and avoid re-scanning the same -containers repeatedly, it caches the list of containers for each volume which auto expires after one hour of its last -used time or if the container iterator for that is invalidated on full utilisation. +* **`DefaultVolumeContainerChoosingPolicy`**: This is the default policy that consolidates both volume selection and container +selection into a single operation. It identifies the most over-utilized volume as the source and the most under-utilized +volume with sufficient space as the destination, then iterates through containers on the source to pick the first one +that is in a **CLOSED** state and is not already being moved. Consolidating both steps avoids recalculating ideal +utilization and disk usage. It caches the list of containers for each volume which auto expires after one hour. ## Security Design DiskBalancer follows the same security model as other services: diff --git a/hadoop-hdds/docs/content/feature/DiskBalancer.md b/hadoop-hdds/docs/content/feature/DiskBalancer.md index d8e7b501ae32..cbfb4a9a6e2a 100644 --- a/hadoop-hdds/docs/content/feature/DiskBalancer.md +++ b/hadoop-hdds/docs/content/feature/DiskBalancer.md @@ -246,8 +246,7 @@ The DiskBalancer's behavior can be controlled using the following configuration | `hdds.datanode.disk.balancer.parallel.thread` | `5` | The number of worker threads to use for moving containers in parallel. | | `hdds.datanode.disk.balancer.service.interval` | `60s` | The time interval at which the Datanode DiskBalancer service checks for imbalance and updates its configuration. | | `hdds.datanode.disk.balancer.stop.after.disk.even` | `true` | If true, the DiskBalancer will automatically stop its balancing activity once disks are considered balanced (i.e., all volume densities are within the threshold). | -| `hdds.datanode.disk.balancer.volume.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy` | The policy class for selecting source and destination volumes for balancing. | -| `hdds.datanode.disk.balancer.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy` | The policy class for selecting which containers to move from a source volume to destination volume. | +| `hdds.datanode.disk.balancer.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy` | The policy for selecting source/destination volumes and which containers to move. | | `hdds.datanode.disk.balancer.service.timeout` | `300s` | Timeout for the Datanode DiskBalancer service operations. | | `hdds.datanode.disk.balancer.should.run.default` | `false` | If the balancer fails to read its persisted configuration, this value determines if the service should run by default. | diff --git a/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md b/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md index 65ba7ca3fa1e..483213a2abb9 100644 --- a/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md +++ b/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md @@ -238,8 +238,7 @@ The DiskBalancer's behavior can be controlled using the following configuration | `hdds.datanode.disk.balancer.parallel.thread` | `5` | 用于并行移动容器的工作线程数。 | | `hdds.datanode.disk.balancer.service.interval` | `60s` | Datanode DiskBalancer 服务检查不平衡并更新其配置的时间间隔。 | | `hdds.datanode.disk.balancer.stop.after.disk.even` | `true` | 如果为真,则一旦磁盘被视为平衡(即所有卷密度都在阈值内),DiskBalancer 将自动停止其平衡活动。 | -| `hdds.datanode.disk.balancer.volume.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy` | 用于选择平衡的源卷和目标卷的策略类。 | -| `hdds.datanode.disk.balancer.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy` | 用于选择将哪些容器从源卷移动到目标卷的策略类。 | +| `hdds.datanode.disk.balancer.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy` | 用于选择源/目标卷以及要移动的容器的策略。 | | `hdds.datanode.disk.balancer.service.timeout` | `300s` | Datanode DiskBalancer 服务操作超时。 | | `hdds.datanode.disk.balancer.should.run.default` | `false` | 如果平衡器无法读取其持久配置,则该值决定服务是否应默认运行。 | diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPerformance.java similarity index 69% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPerformance.java index b9d5989fcab8..5b32607d0d38 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPerformance.java @@ -18,6 +18,9 @@ package org.apache.hadoop.ozone.scm.node; import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.newVolumeFixedUsage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -30,7 +33,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -41,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; @@ -52,15 +58,14 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.common.impl.ContainerDataScanOrder; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -73,9 +78,9 @@ import org.junit.jupiter.api.io.TempDir; /** - * This class tests the ContainerChoosingPolicy. + * This class tests the container selection logic performance. */ -public class TestContainerChoosingPolicy { +public class TestContainerChoosingPerformance { private static final int NUM_VOLUMES = 20; private static final int NUM_CONTAINERS = 100000; @@ -92,15 +97,17 @@ public class TestContainerChoosingPolicy { private List volumes; private ContainerSet containerSet; private OzoneContainer ozoneContainer; - private ContainerChoosingPolicy containerChoosingPolicy; + private DefaultVolumeContainerChoosingPolicy volumeContainerChoosingPolicy; private ExecutorService executor; private MutableVolumeSet volumeSet; // Simulate containers currently being balanced (in progress) private Set inProgressContainerIDs = ConcurrentHashMap.newKeySet(); + private Map deltaMap = new ConcurrentHashMap<>(); @BeforeEach public void setup() throws Exception { + CONF.set("hdds.datanode.volume.min.free.space", "10MB"); containerSet = newContainerSet(); createVolumes(); createContainers(); @@ -108,7 +115,7 @@ public void setup() throws Exception { ContainerController containerController = new ContainerController(containerSet, null); when(ozoneContainer.getController()).thenReturn(containerController); when(ozoneContainer.getContainerSet()).thenReturn(containerSet); - containerChoosingPolicy = new DefaultContainerChoosingPolicy(); + volumeContainerChoosingPolicy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); executor = Executors.newFixedThreadPool(NUM_THREADS); // Create a spied MutableVolumeSet and inject the test volumes @@ -138,46 +145,52 @@ public void cleanUp() { @Test @Timeout(300) public void testConcurrentContainerChoosingPerformance() throws Exception { - testPolicyPerformance("ContainerChoosingPolicy", containerChoosingPolicy); + testContainerSelectionPerformance(); } @Test public void testContainerDeletionAfterIteratorGeneration() throws Exception { - HddsVolume volume = volumes.get(0); - HddsVolume destVolume = volumes.get(1); - - List> containerList = ozoneContainer.getContainerSet().getContainerMap().values().stream() - .filter(x -> volume.getStorageID().equals(x.getContainerData().getVolume().getStorageID())) - .filter(x -> x.getContainerData().isClosed()) - .sorted(ContainerDataScanOrder.INSTANCE) - .collect(Collectors.toList()); + // Test that chooseContainer skips in-progress containers and handles deleted containers. inProgressContainerIDs.clear(); - ContainerData container = containerChoosingPolicy.chooseContainer(ozoneContainer, volume, destVolume, - inProgressContainerIDs, THRESHOLD, volumeSet, null); - assertNotNull(container); - assertEquals(containerList.get(0).getContainerData().getContainerID(), container.getContainerID()); + ChooseContainerContext ctx = getChooseContainerContext(); + + // Get source containers in policy order + List> policyOrder = getSourceContainersInPolicyOrder(ctx.srcVolume); + + // choose first container + ContainerData first = volumeContainerChoosingPolicy.chooseContainer(ozoneContainer, ctx.srcVolume, ctx.dstVolume, + ctx.dstUsage, inProgressContainerIDs, ctx.upperThreshold); + assertNotNull(first, "Expected to choose a container on first call"); + assertEquals(policyOrder.get(0).getContainerData().getContainerID(), first.getContainerID()); + + // Mark first as in-progress and remove second from containerSet + inProgressContainerIDs.add(ContainerID.valueOf(first.getContainerID())); + ozoneContainer.getContainerSet().removeContainer(policyOrder.get(1).getContainerData().getContainerID()); - ozoneContainer.getContainerSet().removeContainer(containerList.get(1).getContainerData().getContainerID()); - inProgressContainerIDs.add(ContainerID.valueOf(container.getContainerID())); - container = containerChoosingPolicy.chooseContainer(ozoneContainer, volume, - destVolume, inProgressContainerIDs, THRESHOLD, volumeSet, null); - assertEquals(containerList.get(1).getContainerData().getContainerID(), container.getContainerID()); + // Second call: policy skips get(0) (in-progress) and get(1) (deleted), returns get(2) + ContainerData secondContainer = volumeContainerChoosingPolicy. + chooseContainer(ozoneContainer, ctx.srcVolume, ctx.dstVolume, + ctx.dstUsage, inProgressContainerIDs, ctx.upperThreshold); + assertNotNull(secondContainer, "Expected to choose a container on second call"); + assertEquals(policyOrder.get(2).getContainerData().getContainerID(), secondContainer.getContainerID()); } /** - * SuccessCount: Number of successful container choices from the policy. - * FailureCount: Failures due to any exceptions thrown during container choice. + * Tests container selection performance using chooseContainer directly. */ - private void testPolicyPerformance(String policyName, ContainerChoosingPolicy policy) throws Exception { + private void testContainerSelectionPerformance() throws Exception { + inProgressContainerIDs.clear(); + deltaMap.clear(); + + ChooseContainerContext ctx = getChooseContainerContext(); + CountDownLatch latch = new CountDownLatch(NUM_THREADS); AtomicInteger containerChosenCount = new AtomicInteger(0); AtomicInteger containerNotChosenCount = new AtomicInteger(0); AtomicInteger failureCount = new AtomicInteger(0); AtomicLong totalTimeNanos = new AtomicLong(0); - Random rand = new Random(); - for (int i = 0; i < NUM_THREADS; i++) { executor.submit(() -> { try { @@ -185,18 +198,11 @@ private void testPolicyPerformance(String policyName, ContainerChoosingPolicy po int containerChosen = 0; int containerNotChosen = 0; int failures = 0; - // Choose a random volume - HddsVolume srcVolume = volumes.get(rand.nextInt(NUM_VOLUMES)); - HddsVolume destVolume; - - do { - destVolume = volumes.get(rand.nextInt(NUM_VOLUMES)); - } while (srcVolume.equals(destVolume)); for (int j = 0; j < NUM_ITERATIONS; j++) { try { - ContainerData c = policy.chooseContainer(ozoneContainer, srcVolume, - destVolume, inProgressContainerIDs, THRESHOLD, volumeSet, null); + ContainerData c = volumeContainerChoosingPolicy.chooseContainer(ozoneContainer, + ctx.srcVolume, ctx.dstVolume, ctx.dstUsage, inProgressContainerIDs, ctx.upperThreshold); if (c == null) { containerNotChosen++; } else { @@ -228,7 +234,7 @@ private void testPolicyPerformance(String policyName, ContainerChoosingPolicy po double avgTimePerOp = (double) totalTimeNanos.get() / totalOperations; double opsPerSec = totalOperations / (totalTimeNanos.get() / 1_000_000_000.0); - System.out.println("Performance results for " + policyName); + System.out.println("Container selection performance results:"); System.out.println("Total volumes: " + NUM_VOLUMES); System.out.println("Total containers: " + NUM_CONTAINERS); System.out.println("Total threads: " + NUM_THREADS); @@ -239,6 +245,42 @@ private void testPolicyPerformance(String policyName, ContainerChoosingPolicy po System.out.println("Total time (ms): " + totalTimeNanos.get() / 1_000_000); System.out.println("Average time per operation (ns): " + avgTimePerOp); System.out.println("Operations per second: " + opsPerSec); + assertTrue(containerChosenCount.get() > 0, "Expected at least some containers to be chosen"); + } + + private ChooseContainerContext getChooseContainerContext() { + List volumeUsages = getVolumeUsages(volumeSet, deltaMap); + volumeUsages.sort(Comparator.comparingDouble(VolumeFixedUsage::getUtilization) + .thenComparing(v -> v.getVolume().getStorageID())); + double idealUsage = getIdealUsage(volumeUsages); + double upperThreshold = idealUsage + THRESHOLD / 100.0; + HddsVolume srcVolume = volumeUsages.get(volumeUsages.size() - 1).getVolume(); + HddsVolume dstVolume = volumeUsages.get(0).getVolume(); + VolumeFixedUsage dstUsage = newVolumeFixedUsage(dstVolume, deltaMap); + return new ChooseContainerContext(srcVolume, dstVolume, dstUsage, upperThreshold); + } + + private List> getSourceContainersInPolicyOrder(HddsVolume srcVolume) { + List> list = new ArrayList<>(); + ozoneContainer.getController().getContainers(srcVolume).forEachRemaining(list::add); + return list.stream() + .filter(c -> c.getContainerData().isClosed() && c.getContainerData().getBytesUsed() > 0) + .collect(Collectors.toList()); + } + + private static final class ChooseContainerContext { + private final HddsVolume srcVolume; + private final HddsVolume dstVolume; + private final VolumeFixedUsage dstUsage; + private final double upperThreshold; + + ChooseContainerContext(HddsVolume srcVolume, HddsVolume dstVolume, + VolumeFixedUsage dstUsage, double upperThreshold) { + this.srcVolume = srcVolume; + this.dstVolume = dstVolume; + this.dstUsage = dstUsage; + this.upperThreshold = upperThreshold; + } } public void createVolumes() throws IOException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java similarity index 76% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java index cdbe046635c3..176833e857e3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java @@ -17,10 +17,12 @@ package org.apache.hadoop.ozone.scm.node; +import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.file.Files; @@ -30,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -39,20 +42,29 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.MockSpaceUsageSource; import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.SpaceUsagePersistence; import org.apache.hadoop.hdds.fs.SpaceUsageSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -62,13 +74,12 @@ /** * This class tests the VolumeChoosingPolicy. */ -public class TestVolumeChoosingPolicy { +public class TestVolumeChoosingPerformance { private static final int NUM_VOLUMES = 20; private static final int NUM_THREADS = 10; private static final int NUM_ITERATIONS = 10000; private static final double THRESHOLD = 10; // 10% threshold - private static final long DEFAULT_CONTAINER_SIZE = 100L * 1024 * 1024; // 100MB private OzoneConfiguration conf; @@ -77,19 +88,22 @@ public class TestVolumeChoosingPolicy { private MutableVolumeSet volumeSet; private List hddsVolumes; - private DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy; + private VolumeContainerChoosingPolicy volumeContainerChoosingPolicy; + private OzoneContainer ozoneContainer; private ExecutorService executor; // delta sizes for source volumes private Map deltaSizes = new ConcurrentHashMap<>(); + private Set inProgressContainerIDs = ConcurrentHashMap.newKeySet(); + private ContainerSet containerSet; @BeforeEach public void setup() throws Exception { - // Create a fresh configuration for each test to avoid interference between tests conf = new OzoneConfiguration(); hddsVolumes = new ArrayList<>(); createVolumes(); - volumeChoosingPolicy = new DefaultVolumeChoosingPolicy(new ReentrantLock()); + createContainers(); + volumeContainerChoosingPolicy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); executor = Executors.newFixedThreadPool(NUM_THREADS); } @@ -124,21 +138,22 @@ public void testVolumeChoosingFailureDueToDiskFull() { for (StorageVolume volume: volumeSet.getVolumesList()) { volume.setConf(testConf); } - assertNull(volumeChoosingPolicy.chooseVolume(volumeSet, THRESHOLD, deltaSizes, DEFAULT_CONTAINER_SIZE)); + assertNull(volumeContainerChoosingPolicy.chooseVolumesAndContainer(ozoneContainer, volumeSet, + deltaSizes, inProgressContainerIDs, THRESHOLD)); assertEquals(NUM_VOLUMES, volumeSet.getVolumesList().size()); } @Test @Timeout(300) public void testConcurrentVolumeChoosingPerformance() throws Exception { - testPolicyPerformance("DefaultVolumeChoosingPolicy", volumeChoosingPolicy); + testPolicyPerformance("VolumeSelectionLogic", volumeContainerChoosingPolicy); } /** - * pairChosenCount: Number of successful volume pair choices from the policy. - * FailureCount: Failures due to any exceptions thrown during volume choice or null return. + * pairChosenCount: Number of successful container/volume choices from the policy. + * FailureCount: Failures due to any exceptions thrown during choice or null return. */ - private void testPolicyPerformance(String policyName, DiskBalancerVolumeChoosingPolicy policy) throws Exception { + private void testPolicyPerformance(String policyName, VolumeContainerChoosingPolicy policy) throws Exception { CountDownLatch latch = new CountDownLatch(NUM_THREADS); AtomicInteger pairChosenCount = new AtomicInteger(0); AtomicInteger pairNotChosenCount = new AtomicInteger(0); @@ -179,23 +194,22 @@ private void testPolicyPerformance(String policyName, DiskBalancerVolumeChoosing long threadStart = System.nanoTime(); try { - // Use a reasonable container size for the policy check - // The policy checks if containerSize < computeUsableSpace() - Pair pair = policy.chooseVolume( - volumeSet, THRESHOLD, deltaSizes, DEFAULT_CONTAINER_SIZE); + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaSizes, inProgressContainerIDs, THRESHOLD); - if (pair == null) { + if (candidate == null) { volumeNotChosen++; } else { volumeChosen++; - HddsVolume sourceVolume = pair.getLeft(); + HddsVolume sourceVolume = candidate.getSourceVolume(); // The policy automatically calls destVolume.incCommittedBytes(containerSize) // when a pair is chosen, so we need to simulate the corresponding // deltaMap update for the source volume (negative value = space to be freed) // This aligns with how DiskBalancerService updates deltaSizes after choosing - deltaSizes.compute(sourceVolume, (k, v) -> (v == null ? 0L : v) - DEFAULT_CONTAINER_SIZE); - + deltaSizes.compute(sourceVolume, (k, v) -> (v == null ? 0L : v) + - candidate.getContainerData().getBytesUsed()); + // Note: In a real DiskBalancerService, after a successful container move, // the committed bytes on the destination would be adjusted and the delta // on the source would be cleared. For this performance test, we simulate @@ -282,4 +296,30 @@ private void createVolumes() throws IOException { System.out.println("Created " + NUM_VOLUMES + " volumes in " + (System.currentTimeMillis() - startTime) + " ms"); } + + private void createContainers() { + containerSet = newContainerSet(); + Random random = new Random(); + for (int i = 0; i < NUM_VOLUMES * 10; i++) { + HddsVolume volume = hddsVolumes.get(i % NUM_VOLUMES); + try { + long id = 1000L + i; + long bytesUsed = (random.nextInt(50) + 1) * 1024 * 1024L; + KeyValueContainerData containerData = new KeyValueContainerData(id, + ContainerLayoutVersion.FILE_PER_BLOCK, ContainerTestHelper.CONTAINER_MAX_SIZE, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + containerData.setState(ContainerDataProto.State.CLOSED); + containerData.setVolume(volume); + containerData.getStatistics().setBlockBytesForTesting(bytesUsed); + KeyValueContainer container = new KeyValueContainer(containerData, conf); + containerSet.addContainer(container); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + ozoneContainer = mock(OzoneContainer.class); + ContainerController controller = new ContainerController(containerSet, null); + when(ozoneContainer.getController()).thenReturn(controller); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + } } From 4740a4ec5d9a732b76e999483a7cd04dbe38ad76 Mon Sep 17 00:00:00 2001 From: Gargi Jaiswal Date: Mon, 2 Mar 2026 12:45:11 +0530 Subject: [PATCH 2/2] fix tests failure and pmd issues --- .../diskbalancer/TestDiskBalancerContainerChoosingLogic.java | 5 +++-- .../diskbalancer/TestDiskBalancerVolumeChoosingLogic.java | 1 + .../hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java | 3 +-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java index 73587c4bd617..8c2bb6c6e964 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java @@ -80,7 +80,6 @@ public class TestDiskBalancerContainerChoosingLogic { private ContainerSet containerSet; private HddsVolume sourceVolume; private HddsVolume destVolume1; - private HddsVolume destVolume2; private Set inProgressContainerIDs; private Map deltaMap; @@ -105,7 +104,7 @@ private void setupVolumesAndContainer() throws IOException { // Create volumes with specific utilization sourceVolume = createVolume("source-volume", 0.80); destVolume1 = createVolume("dest-volume1", 0.10); - destVolume2 = createVolume("dest-volume2", 0.50); + HddsVolume destVolume2 = createVolume("dest-volume2", 0.50); CONF.set(HDDS_DATANODE_DIR_KEY, baseDir.resolve("defaultVolume").toString()); @@ -133,6 +132,7 @@ private void setupVolumesAndContainer() throws IOException { ozoneContainer = mock(OzoneContainer.class); ContainerController controller = new ContainerController(containerSet, null); when(ozoneContainer.getController()).thenReturn(controller); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); } /** @@ -221,6 +221,7 @@ public void testSizeZeroContainersSkipped() throws IOException { OzoneContainer testOzoneContainer = mock(OzoneContainer.class); ContainerController testController = new ContainerController(testContainerSet, null); when(testOzoneContainer.getController()).thenReturn(testController); + when(testOzoneContainer.getContainerSet()).thenReturn(testContainerSet); // The policy should skip containers 10 and 11 (size 0) and choose container 12 DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(testOzoneContainer, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java index 7ef3f0cd63a4..f8097f0c0918 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java @@ -280,6 +280,7 @@ private void setupVolumeSetAndContainers(List volumes, ozoneContainer = mock(OzoneContainer.class); ContainerController controller = new ContainerController(containerSet, null); when(ozoneContainer.getController()).thenReturn(controller); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); } private void createContainer(long id, long usedBytes, HddsVolume vol) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java index 176833e857e3..ee501da18625 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java @@ -95,7 +95,6 @@ public class TestVolumeChoosingPerformance { // delta sizes for source volumes private Map deltaSizes = new ConcurrentHashMap<>(); private Set inProgressContainerIDs = ConcurrentHashMap.newKeySet(); - private ContainerSet containerSet; @BeforeEach public void setup() throws Exception { @@ -298,7 +297,7 @@ private void createVolumes() throws IOException { } private void createContainers() { - containerSet = newContainerSet(); + ContainerSet containerSet = newContainerSet(); Random random = new Random(); for (int i = 0; i < NUM_VOLUMES * 10; i++) { HddsVolume volume = hddsVolumes.get(i % NUM_VOLUMES);