diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java index e4c8aa3b514f..409b4d95c428 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeChoosingPolicyFactory.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerConfiguration; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; import org.apache.ratis.util.ReflectionUtils; /** @@ -48,9 +48,9 @@ public static VolumeChoosingPolicy getPolicy(ConfigurationSource conf) { return ReflectionUtils.newInstance(policyClass, new Class[] {ReentrantLock.class}, LOCK); } - public static DiskBalancerVolumeChoosingPolicy getDiskBalancerPolicy(ConfigurationSource conf) { - Class policyClass = conf.getObject(DiskBalancerConfiguration.class).getVolumeChoosingPolicyClass(); - return (DiskBalancerVolumeChoosingPolicy) ReflectionUtils.newInstance( + public static VolumeContainerChoosingPolicy getDiskBalancerContainerPolicy(ConfigurationSource conf) { + Class policyClass = conf.getObject(DiskBalancerConfiguration.class).getVolumeContainerChoosingPolicyClass(); + return (VolumeContainerChoosingPolicy) ReflectionUtils.newInstance( policyClass, new Class[]{ReentrantLock.class}, LOCK); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java index 447691c0a5bd..53b8b3f8f03b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerConfiguration.java @@ -94,20 +94,13 @@ public final class DiskBalancerConfiguration { ) private long diskBalancerTimeout = Duration.ofSeconds(300).toMillis(); - @Config(key = "hdds.datanode.disk.balancer.volume.choosing.policy", type = ConfigType.CLASS, + @Config(key = "hdds.datanode.disk.balancer.volume.container.choosing.policy", type = ConfigType.CLASS, defaultValue = "org.apache.hadoop.ozone.container.diskbalancer.policy" + - ".DefaultVolumeChoosingPolicy", + ".DefaultVolumeContainerChoosingPolicy", tags = {ConfigTag.DISKBALANCER}, - description = "The volume choosing policy of the disk balancer service.") - private Class volumeChoosingPolicyClass; - - @Config(key = "hdds.datanode.disk.balancer.container.choosing.policy", type = ConfigType.CLASS, - defaultValue = "org.apache.hadoop.ozone.container.diskbalancer.policy" + - ".DefaultContainerChoosingPolicy", - tags = {ConfigTag.DISKBALANCER}, - description = "The container choosing policy of the disk balancer " + - "service.") - private Class containerChoosingPolicyClass; + description = "The policy for selecting source/destination volumes and " + + "containers to move for disk balancing.") + private Class volumeContainerChoosingPolicyClass; @Config(key = "hdds.datanode.disk.balancer.stop.after.disk.even", type = ConfigType.BOOLEAN, @@ -174,12 +167,8 @@ public void setDiskBalancerTimeout(Duration duration) { this.diskBalancerTimeout = duration.toMillis(); } - public Class getVolumeChoosingPolicyClass() { - return volumeChoosingPolicyClass; - } - - public Class getContainerChoosingPolicyClass() { - return containerChoosingPolicyClass; + public Class getVolumeContainerChoosingPolicyClass() { + return volumeContainerChoosingPolicyClass; } public boolean isStopAfterDiskEven() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index 9503c2e3c1f8..3e607c892fa7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -41,15 +41,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DiskBalancerRunningStatus; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.server.ServerUtils; @@ -69,9 +66,9 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; @@ -126,12 +123,10 @@ public class DiskBalancerService extends BackgroundService { private Map deltaSizes; private MutableVolumeSet volumeSet; - private DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy; - private ContainerChoosingPolicy containerChoosingPolicy; + private VolumeContainerChoosingPolicy volumeContainerChoosingPolicy; private final File diskBalancerInfoFile; private DiskBalancerServiceMetrics metrics; - private long containerDefaultSize; public DiskBalancerService(OzoneContainer ozoneContainer, long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit, @@ -148,15 +143,9 @@ public DiskBalancerService(OzoneContainer ozoneContainer, inProgressContainers = ConcurrentHashMap.newKeySet(); deltaSizes = new ConcurrentHashMap<>(); volumeSet = ozoneContainer.getVolumeSet(); - containerDefaultSize = (long) conf.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); try { - volumeChoosingPolicy = VolumeChoosingPolicyFactory.getDiskBalancerPolicy(conf); - containerChoosingPolicy = (ContainerChoosingPolicy) - conf.getObject(DiskBalancerConfiguration.class) - .getContainerChoosingPolicyClass().newInstance(); + volumeContainerChoosingPolicy = VolumeChoosingPolicyFactory.getDiskBalancerContainerPolicy(conf); } catch (Exception e) { LOG.error("Got exception when initializing DiskBalancerService", e); throw new IOException(e); @@ -405,25 +394,20 @@ public BackgroundTaskQueue getTasks() { } for (int i = 0; i < availableTaskCount; i++) { - Pair pair = volumeChoosingPolicy - .chooseVolume(volumeSet, threshold, deltaSizes, containerDefaultSize); - if (pair == null) { - continue; - } - HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight(); - ContainerData toBalanceContainer = containerChoosingPolicy - .chooseContainer(ozoneContainer, sourceVolume, destVolume, - inProgressContainers, threshold, volumeSet, deltaSizes); - if (toBalanceContainer != null) { - DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume, - destVolume); - queue.add(task); - inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID())); - deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L) - - toBalanceContainer.getBytesUsed()); - } else { - // release destVolume committed bytes - destVolume.incCommittedBytes(0 - containerDefaultSize); + DiskBalancerVolumeContainerCandidate candidate = volumeContainerChoosingPolicy.chooseVolumesAndContainer( + ozoneContainer, volumeSet, deltaSizes, inProgressContainers, threshold); + if (candidate != null) { + HddsVolume sourceVolume = candidate.getSourceVolume(); + HddsVolume destVolume = candidate.getDestVolume(); + ContainerData toBalanceContainer = candidate.getContainerData(); + if (toBalanceContainer != null) { + DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume, + destVolume); + queue.add(task); + inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID())); + deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L) + - toBalanceContainer.getBytesUsed()); + } } } @@ -503,7 +487,7 @@ public BackgroundTaskResult call() { // QUASI_CLOSED is allowed when test mode is enabled, this is done to test in production // these containers are rejected. State containerState = container.getContainerData().getState(); - boolean isTestMode = DefaultContainerChoosingPolicy.isTest(); + boolean isTestMode = DefaultVolumeContainerChoosingPolicy.isTest(); if (containerState != State.CLOSED && !(isTestMode && containerState == State.QUASI_CLOSED)) { LOG.warn("Container {} is in {} state, skipping move process. Only CLOSED containers can be moved.", containerId, containerState); @@ -640,7 +624,7 @@ private void postCall(boolean success, long startTime) { inProgressContainers.remove(ContainerID.valueOf(containerData.getContainerID())); deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) + containerData.getBytesUsed()); - destVolume.incCommittedBytes(0 - containerDefaultSize); + destVolume.incCommittedBytes(0 - containerData.getBytesUsed()); long endTime = Time.monotonicNow(); if (success) { metrics.incrSuccessCount(1); @@ -749,22 +733,13 @@ public void setBalancedBytesInLastWindow(long bytes) { this.balancedBytesInLastWindow.set(bytes); } - public ContainerChoosingPolicy getContainerChoosingPolicy() { - return containerChoosingPolicy; - } - - public DiskBalancerVolumeChoosingPolicy getVolumeChoosingPolicy() { - return volumeChoosingPolicy; - } - - @VisibleForTesting - public void setVolumeChoosingPolicy(DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy) { - this.volumeChoosingPolicy = volumeChoosingPolicy; + public VolumeContainerChoosingPolicy getVolumeContainerChoosingPolicy() { + return volumeContainerChoosingPolicy; } @VisibleForTesting - public void setContainerChoosingPolicy(ContainerChoosingPolicy containerChoosingPolicy) { - this.containerChoosingPolicy = containerChoosingPolicy; + public void setVolumeContainerChoosingPolicy(VolumeContainerChoosingPolicy volumeContainerChoosingPolicy) { + this.volumeContainerChoosingPolicy = volumeContainerChoosingPolicy; } @VisibleForTesting diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java deleted file mode 100644 index f964f6f519e3..000000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.diskbalancer.policy; - -import static java.util.concurrent.TimeUnit.HOURS; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.computeUtilization; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import org.apache.hadoop.hdds.fs.SpaceUsageSource; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; -import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Choose a container from specified volume, make sure it's not being balancing. - */ -public class DefaultContainerChoosingPolicy implements ContainerChoosingPolicy { - public static final Logger LOG = LoggerFactory.getLogger( - DefaultContainerChoosingPolicy.class); - - private static final ThreadLocal>>> CACHE = - ThreadLocal.withInitial( - () -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1, HOURS).build()); - - // for test - private static boolean test = false; - - @Override - public ContainerData chooseContainer(OzoneContainer ozoneContainer, - HddsVolume src, HddsVolume dst, - Set inProgressContainerIDs, - double thresholdPercentage, MutableVolumeSet volumeSet, - Map deltaMap) { - final Iterator> itr; - try { - itr = CACHE.get().get(src, () -> ozoneContainer.getController().getContainers(src)); - } catch (ExecutionException e) { - LOG.warn("Failed to get container iterator for volume {}", src, e); - return null; - } - - // Calculate the actual threshold - final List volumeUsages = getVolumeUsages(volumeSet, deltaMap); - final double actualThreshold = getIdealUsage(volumeUsages) + thresholdPercentage / 100.0; - - // Find container - final SpaceUsageSource.Fixed dstUsage = dst.getCurrentUsage(); - final long dstCommittedBytes = dst.getCommittedBytes(); - while (itr.hasNext()) { - ContainerData containerData = itr.next().getContainerData(); - if (containerData.getBytesUsed() > 0 && - !inProgressContainerIDs.contains(ContainerID.valueOf(containerData.getContainerID())) && - (containerData.isClosed() || (test && containerData.isQuasiClosed()))) { - - // Check if dst can accept the candidate container. - if (computeUtilization(dstUsage, dstCommittedBytes, containerData.getBytesUsed()) < actualThreshold) { - return containerData; - } - } - } - - CACHE.get().invalidate(src); - return null; - } - - @VisibleForTesting - public static void setTest(boolean isTest) { - test = isTest; - } - - @VisibleForTesting - public static boolean isTest() { - return test; - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java deleted file mode 100644 index 240c5a9f9f00..000000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.diskbalancer.policy; - -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; -import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.newVolumeFixedUsage; - -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hdds.fs.SpaceUsageSource; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; -import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Choose a random volume for disk balancing. - * - * Source volumes use deltaMap to simulate space that will be freed (pre-deleted). - * Destination volumes use committedBytes to account for space already reserved. - * Both deltaMap and committedBytes are considered to calculate usage. - */ -public class DefaultVolumeChoosingPolicy implements DiskBalancerVolumeChoosingPolicy { - - public static final Logger LOG = LoggerFactory.getLogger( - DefaultVolumeChoosingPolicy.class); - private final ReentrantLock lock; - - public DefaultVolumeChoosingPolicy(ReentrantLock globalLock) { - lock = globalLock; - } - - @Override - public Pair chooseVolume(MutableVolumeSet volumeSet, - double thresholdPercentage, Map deltaMap, long containerSize) { - lock.lock(); - try { - // Create truly immutable snapshot of volumes to ensure consistency - final List allVolumes = volumeSet.getVolumesList(); - if (allVolumes.size() < 2) { - return null; // Can't balance with less than 2 volumes. - } - - // Calculate usages and sort in ascending order of utilization - final List volumeUsages = allVolumes.stream() - .map(v -> newVolumeFixedUsage(v, deltaMap)) - .sorted(Comparator.comparingDouble(VolumeFixedUsage::getUtilization)) - .collect(Collectors.toList()); - - // Calculate ideal usage and threshold range - final double idealUsage = getIdealUsage(volumeUsages); - final double actualThreshold = thresholdPercentage / 100.0; - final double lowerThreshold = idealUsage - actualThreshold; - final double upperThreshold = idealUsage + actualThreshold; - - // Log all volume information for investigation - if (LOG.isDebugEnabled()) { - logVolumeBalancingState(volumeUsages, idealUsage, thresholdPercentage, - lowerThreshold, upperThreshold, containerSize, deltaMap); - } - - // Get highest and lowest utilization volumes - final VolumeFixedUsage highestUsage = volumeUsages.get(volumeUsages.size() - 1); - final VolumeFixedUsage lowestUsage = volumeUsages.get(0); - - // Only return null if highest is below upper threshold AND lowest is above lower threshold - // This means all volumes are strictly within the range (not at boundaries) - if (highestUsage.getUtilization() < upperThreshold && - lowestUsage.getUtilization() > lowerThreshold) { - // All volumes are strictly within threshold range, no balancing needed - return null; - } - - // Determine source volume: highest utilization volume (if above threshold) - final VolumeFixedUsage src = highestUsage; - - // Find destination volume: lowest utilization volume that has enough space - // Prefer volumes below threshold, but accept any volume with lower utilization than source - for (int i = 0; i < volumeUsages.size() - 1; i++) { - final VolumeFixedUsage dstUsage = volumeUsages.get(i); - final HddsVolume dst = dstUsage.getVolume(); - - // Check if destination has enough space and has lower utilization than source - if (dstUsage.getUtilization() < src.getUtilization() && - containerSize < dstUsage.computeUsableSpace()) { - // Found dst, reserve space and return - dst.incCommittedBytes(containerSize); - LOG.debug("Chosen volume pair for disk balancing: source={} (utilization={}), " + - "destination={} (utilization={})", - src.getVolume().getStorageID(), src.getUtilization(), - dst.getStorageID(), dstUsage.getUtilization()); - return Pair.of(src.getVolume(), dst); - } - LOG.debug("Destination volume {} does not have enough space, trying next volume.", - dst.getStorageID()); - } - LOG.debug("Failed to find appropriate destination volume."); - return null; - } finally { - lock.unlock(); - } - } - - /** - * Logs all volume information for disk balancing investigation. - * - * @param volumeUsages List of volume usages (sorted by utilization ascending) - * @param idealUsage Calculated ideal usage - * @param thresholdPercentage Threshold percentage - * @param lowerThreshold Lower threshold bound - * @param upperThreshold Upper threshold bound - * @param containerSize Container size to be moved - * @param deltaMap Map of volume deltas - */ - private void logVolumeBalancingState(List volumeUsages, - double idealUsage, double thresholdPercentage, double lowerThreshold, - double upperThreshold, long containerSize, Map deltaMap) { - LOG.debug("Disk balancing state - idealUsage={}, thresholdPercentage={}%, " + - "thresholdRange=({}, {}), containerSize={}", - String.format("%.10f", idealUsage), thresholdPercentage, - String.format("%.10f", lowerThreshold), String.format("%.10f", upperThreshold), - containerSize); - for (int i = 0; i < volumeUsages.size(); i++) { - VolumeFixedUsage vfu = volumeUsages.get(i); - HddsVolume vol = vfu.getVolume(); - SpaceUsageSource.Fixed usage = vfu.getUsage(); - long usableSpace = vfu.computeUsableSpace(); - LOG.debug("Volume[{}] - disk={}, utilization={}, capacity={}, " + - "effectiveUsed={}, available={}, usableSpace={}, committedBytes={}, delta={}", - i, vol.getStorageID(), String.format("%.10f", vfu.getUtilization()), - usage.getCapacity(), vfu.getEffectiveUsed(), usage.getAvailable(), - usableSpace, vol.getCommittedBytes(), deltaMap.getOrDefault(vol, 0L)); - } - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeContainerChoosingPolicy.java new file mode 100644 index 000000000000..ea384b2c9e7f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeContainerChoosingPolicy.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer.policy; + +import static java.util.concurrent.TimeUnit.HOURS; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.computeUtilization; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.newVolumeFixedUsage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.fs.SpaceUsageSource; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * First chooses a source volume and destination volume pair based on ideal utilization and threshold, + * then chooses a container from the source volume that can be moved to the destination without + * exceeding the upper threshold. Space is reserved on the destination only when a container is + * chosen, using the actual container size. + */ +public class DefaultVolumeContainerChoosingPolicy implements VolumeContainerChoosingPolicy { + public static final Logger LOG = LoggerFactory.getLogger( + DefaultVolumeContainerChoosingPolicy.class); + + private static final ThreadLocal>>> CACHE = + ThreadLocal.withInitial( + () -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1, HOURS).build()); + + // for test + private static boolean test = false; + + private final ReentrantLock lock; + + public DefaultVolumeContainerChoosingPolicy(ReentrantLock globalLock) { + this.lock = globalLock; + } + + @Override + public DiskBalancerVolumeContainerCandidate chooseVolumesAndContainer(OzoneContainer ozoneContainer, + MutableVolumeSet volumeSet, Map deltaMap, Set inProgressContainerIDs, + double thresholdPercentage) { + lock.lock(); + try { + // Create truly immutable snapshot of volumes to ensure consistency + final List allVolumes = volumeSet.getVolumesList(); + if (allVolumes.size() < 2) { + return null; // Can't balance with less than 2 volumes. + } + + // Calculate usages and sort in ascending order of utilization (once) + // Use storage ID as secondary sort for deterministic ordering when utilizations are equal + final List volumeUsages = allVolumes.stream() + .map(v -> newVolumeFixedUsage(v, deltaMap)) + .sorted(Comparator.comparingDouble(VolumeFixedUsage::getUtilization) + .thenComparing(v -> v.getVolume().getStorageID())) + .collect(Collectors.toList()); + + // Calculate ideal usage and threshold range (once) + final double idealUsage = getIdealUsage(volumeUsages); + final double actualThreshold = thresholdPercentage / 100.0; + final double lowerThreshold = idealUsage - actualThreshold; + final double upperThreshold = idealUsage + actualThreshold; + + if (LOG.isDebugEnabled()) { + logVolumeBalancingState(volumeUsages, idealUsage, thresholdPercentage, + lowerThreshold, upperThreshold, deltaMap); + } + + // Get highest and lowest utilization volumes + final VolumeFixedUsage highestUsage = volumeUsages.get(volumeUsages.size() - 1); + final VolumeFixedUsage lowestUsage = volumeUsages.get(0); + + // Only return null if highest is below upper threshold AND lowest is above lower threshold + if (highestUsage.getUtilization() < upperThreshold && + lowestUsage.getUtilization() > lowerThreshold) { + return null; + } + + // Determine source volume: highest utilization volume + final VolumeFixedUsage srcUsage = highestUsage; + final HddsVolume src = srcUsage.getVolume(); + + // Find destination volume and container: try each dest with lower utilization than source + for (int i = 0; i < volumeUsages.size() - 1; i++) { + final VolumeFixedUsage dstUsage = volumeUsages.get(i); + final HddsVolume dst = dstUsage.getVolume(); + + // Check if destination has lower utilization than source and some usable space + if (dstUsage.getUtilization() < srcUsage.getUtilization() && + dstUsage.computeUsableSpace() > 0) { + ContainerData containerData = chooseContainer(ozoneContainer, + src, dst, dstUsage, inProgressContainerIDs, upperThreshold); + if (containerData != null) { + long containerSize = containerData.getBytesUsed(); + dst.incCommittedBytes(containerSize); + LOG.debug("Chosen volume pair for disk balancing: source={} (utilization={}), " + + "destination={} (utilization={})", + src.getStorageID(), srcUsage.getUtilization(), + dst.getStorageID(), dstUsage.getUtilization()); + return new DiskBalancerVolumeContainerCandidate(containerData, src, dst); + } + LOG.debug("No suitable container found for destination {}, trying next volume.", + dst.getStorageID()); + } else { + LOG.debug("Destination volume {} does not have enough space, trying next volume.", + dst.getStorageID()); + } + } + LOG.debug("Failed to find appropriate destination volume and container."); + return null; + } finally { + lock.unlock(); + } + } + + /** + * Choose a container from source volume that can be moved to destination + * without exceeding the upper threshold. Checks both space and utilization. + *

+ * This method is public to allow direct testing of container selection logic + * in integration tests without requiring full volume selection setup. + */ + public ContainerData chooseContainer(OzoneContainer ozoneContainer, + HddsVolume src, HddsVolume dst, VolumeFixedUsage dstUsage, + Set inProgressContainerIDs, double upperThreshold) { + final Iterator> itr; + try { + itr = CACHE.get().get(src, () -> ozoneContainer.getController().getContainers(src)); + } catch (ExecutionException e) { + LOG.warn("Failed to get container iterator for volume {}", src, e); + return null; + } + + final SpaceUsageSource.Fixed dstSpaceUsage = dstUsage.getUsage(); + final long dstCommittedBytes = dst.getCommittedBytes(); + final long usableSpace = dstUsage.computeUsableSpace(); + + while (itr.hasNext()) { + ContainerData containerData = itr.next().getContainerData(); + + // Skip containers removed from containerSet after iterator was cached + if (ozoneContainer.getContainerSet().getContainer(containerData.getContainerID()) == null) { + continue; + } + + if (containerData.getBytesUsed() > 0 && + !inProgressContainerIDs.contains(ContainerID.valueOf(containerData.getContainerID())) && + (containerData.isClosed() || (test && containerData.isQuasiClosed()))) { + + long containerSize = containerData.getBytesUsed(); + // Check if dst has enough space and can accept the container without exceeding threshold + if (containerSize <= usableSpace && + computeUtilization(dstSpaceUsage, dstCommittedBytes, containerSize) < upperThreshold) { + return containerData; + } + } + } + + CACHE.get().invalidate(src); + return null; + } + + /** + * Logs all volume information for disk balancing investigation. + */ + private void logVolumeBalancingState(List volumeUsages, + double idealUsage, double thresholdPercentage, double lowerThreshold, + double upperThreshold, Map deltaMap) { + LOG.debug("Disk balancing state - idealUsage={}, thresholdPercentage={}%, " + + "thresholdRange=({}, {})", + String.format("%.10f", idealUsage), thresholdPercentage, + String.format("%.10f", lowerThreshold), String.format("%.10f", upperThreshold)); + for (int i = 0; i < volumeUsages.size(); i++) { + VolumeFixedUsage vfu = volumeUsages.get(i); + HddsVolume vol = vfu.getVolume(); + SpaceUsageSource.Fixed usage = vfu.getUsage(); + long usableSpace = vfu.computeUsableSpace(); + LOG.debug("Volume[{}] - disk={}, utilization={}, capacity={}, " + + "effectiveUsed={}, available={}, usableSpace={}, committedBytes={}, delta={}", + i, vol.getStorageID(), String.format("%.10f", vfu.getUtilization()), + usage.getCapacity(), vfu.getEffectiveUsed(), usage.getAvailable(), + usableSpace, vol.getCommittedBytes(), deltaMap.getOrDefault(vol, 0L)); + } + } + + @VisibleForTesting + public static void setTest(boolean isTest) { + test = isTest; + } + + @VisibleForTesting + public static boolean isTest() { + return test; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeContainerCandidate.java similarity index 52% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeContainerCandidate.java index c64733fc37ea..9e556e6934bb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DiskBalancerVolumeContainerCandidate.java @@ -17,24 +17,34 @@ package org.apache.hadoop.ozone.container.diskbalancer.policy; -import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; /** - * This interface specifies the policy for choosing volumes to balance. + * Result of consolidated volume and container selection for disk balancing. + * Contains the container to move and its source and destination volumes. */ -public interface DiskBalancerVolumeChoosingPolicy { - /** - * Choose a pair of volumes for balancing. - * - * @param volumeSet - volumes to choose from. - * @param thresholdPercentage the threshold percentage in range (0, 100) to choose the source volume. - * @param deltaSizes - the sizes changes of inProgress balancing jobs. - * @param containerSize - the estimated size of container to be moved. - * @return Source volume and Dest volume. - */ - Pair chooseVolume(MutableVolumeSet volumeSet, - double thresholdPercentage, Map deltaSizes, long containerSize); +public final class DiskBalancerVolumeContainerCandidate { + private final ContainerData containerData; + private final HddsVolume sourceVolume; + private final HddsVolume destVolume; + + public DiskBalancerVolumeContainerCandidate(ContainerData containerData, + HddsVolume sourceVolume, HddsVolume destVolume) { + this.containerData = containerData; + this.sourceVolume = sourceVolume; + this.destVolume = destVolume; + } + + public ContainerData getContainerData() { + return containerData; + } + + public HddsVolume getSourceVolume() { + return sourceVolume; + } + + public HddsVolume getDestVolume() { + return destVolume; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/VolumeContainerChoosingPolicy.java similarity index 55% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/VolumeContainerChoosingPolicy.java index d512de84bdf4..9a9e7d99102d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/VolumeContainerChoosingPolicy.java @@ -20,30 +20,33 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; /** - * This interface specifies the policy for choosing containers to balance. + * This interface specifies the policy for choosing volumes and containers to balance. + * It provides consolidated volume selection (source/destination pair) and container selection + * into a single operation to avoid recalculating ideal utilization and disk usage. */ -public interface ContainerChoosingPolicy { +public interface VolumeContainerChoosingPolicy { /** - * Choose a container for balancing. - * @param ozoneContainer the OzoneContainer instance to get all containers of a particular volume. - * @param srcVolume the HddsVolume instance to choose containers from. - * @param destVolume the destination volume to which container is being moved. - * @param inProgressContainerIDs containerIDs present in this set should be - - avoided as these containers are already under move by diskBalancer. - * @param thresholdPercentage the threshold percentage in range (0, 100) + * Choose a container and its source/destination volumes for balancing. + * Performs both volume pair selection and container selection in one call, + * computing ideal usage and volume utilizations only once. + * Space is reserved on the destination only when a container is chosen, + * using the actual container size. + * + * @param ozoneContainer the OzoneContainer instance to get all containers * @param volumeSet the volumeSet instance - * @param deltaMap the deltaMap instance of source volume - * @return a Container + * @param deltaMap the deltaMap for in-progress balancing jobs (negative = space to be freed) + * @param inProgressContainerIDs containerIDs to avoid (already under move) + * @param thresholdPercentage the threshold percentage in range (0, 100) + * @return a DiskBalancerVolumeContainerCandidate with container and volumes, or null if none found */ - ContainerData chooseContainer(OzoneContainer ozoneContainer, - HddsVolume srcVolume, HddsVolume destVolume, + DiskBalancerVolumeContainerCandidate chooseVolumesAndContainer(OzoneContainer ozoneContainer, + MutableVolumeSet volumeSet, + Map deltaMap, Set inProgressContainerIDs, - double thresholdPercentage, MutableVolumeSet volumeSet, - Map deltaMap); + double thresholdPercentage); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java similarity index 80% rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultContainerChoosingPolicy.java rename to hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java index 67fa7dac39ed..8c2bb6c6e964 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultContainerChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerContainerChoosingLogic.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; @@ -45,14 +46,13 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -64,7 +64,7 @@ /** * Unit tests for the DefaultContainerChoosingPolicy. */ -public class TestDefaultContainerChoosingPolicy { +public class TestDiskBalancerContainerChoosingLogic { @TempDir private Path baseDir; @@ -74,19 +74,18 @@ public class TestDefaultContainerChoosingPolicy { private static final long VOLUME_CAPACITY = 2500L * MB; // 2500MB private static final double THRESHOLD = 10.0; - private ContainerChoosingPolicy policy; + private DefaultVolumeContainerChoosingPolicy policy; private OzoneContainer ozoneContainer; private MutableVolumeSet volumeSet; private ContainerSet containerSet; private HddsVolume sourceVolume; private HddsVolume destVolume1; - private HddsVolume destVolume2; private Set inProgressContainerIDs; private Map deltaMap; @BeforeEach public void setup() throws Exception { - policy = new DefaultContainerChoosingPolicy(); + policy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); setupVolumesAndContainer(); inProgressContainerIDs = new HashSet<>(); deltaMap = new HashMap<>(); @@ -105,7 +104,7 @@ private void setupVolumesAndContainer() throws IOException { // Create volumes with specific utilization sourceVolume = createVolume("source-volume", 0.80); destVolume1 = createVolume("dest-volume1", 0.10); - destVolume2 = createVolume("dest-volume2", 0.50); + HddsVolume destVolume2 = createVolume("dest-volume2", 0.50); CONF.set(HDDS_DATANODE_DIR_KEY, baseDir.resolve("defaultVolume").toString()); @@ -133,6 +132,7 @@ private void setupVolumesAndContainer() throws IOException { ozoneContainer = mock(OzoneContainer.class); ContainerController controller = new ContainerController(containerSet, null); when(ozoneContainer.getController()).thenReturn(controller); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); } /** @@ -174,29 +174,35 @@ public void testContainerChosenSuccessfully() { // - C1 (500MB) is productive: (250+500)/2500 = 30% <= 56.67% // The policy iterates by container ID, so it will find and return C1. - ContainerData chosenContainer = policy.chooseContainer(ozoneContainer, - sourceVolume, destVolume1, inProgressContainerIDs, THRESHOLD, volumeSet, deltaMap); + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, THRESHOLD); - // first container should be chosen - assertNotNull(chosenContainer); - assertEquals(1L, chosenContainer.getContainerID()); + // first contianer should be chosen + assertNotNull(candidate); + assertEquals(1L, candidate.getContainerData().getContainerID()); } @Test - public void testContainerNotChosen() { - // For destVolume2, no container move should be productive. - // Max Allowed Utilization is ~56.67%. - // - // Evaluation for destVolume2 (50% used / 1250MB): - // Max productive size = (0.5667 * 2500) - 1250 = 166.75MB - // All containers on the source volume (smallest is 200MB) are larger - // than 166.75MB. Therefore, no container should be chosen. - - ContainerData chosenContainer = policy.chooseContainer(ozoneContainer, - sourceVolume, destVolume2, inProgressContainerIDs, THRESHOLD, volumeSet, deltaMap); + public void testContainerNotChosen() throws IOException { + // Test scenario where no container fits the destination volume. + // Block dest1 (lowest utilization) and mark small containers as in-progress. + // Only large containers remain (500MB, 450MB), but dest2 can accept max 166.75MB. + + // Block dest1 by marking all available space as committed + destVolume1.incCommittedBytes(destVolume1.getCurrentUsage().getAvailable()); + + // Mark small containers as in-progress (unavailable) + inProgressContainerIDs.add(ContainerID.valueOf(3L)); // 200MB + inProgressContainerIDs.add(ContainerID.valueOf(4L)); // 350MB + + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, THRESHOLD); - // No containers should not be chosen - assertNull(chosenContainer); + assertNull(candidate); + + // Cleanup + destVolume1.incCommittedBytes(-destVolume1.getCurrentUsage().getAvailable()); + inProgressContainerIDs.clear(); } @Test @@ -215,15 +221,16 @@ public void testSizeZeroContainersSkipped() throws IOException { OzoneContainer testOzoneContainer = mock(OzoneContainer.class); ContainerController testController = new ContainerController(testContainerSet, null); when(testOzoneContainer.getController()).thenReturn(testController); + when(testOzoneContainer.getContainerSet()).thenReturn(testContainerSet); // The policy should skip containers 10 and 11 (size 0) and choose container 12 - ContainerData chosenContainer = policy.chooseContainer(testOzoneContainer, - sourceVolume, destVolume1, inProgressContainerIDs, THRESHOLD, volumeSet, deltaMap); - + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(testOzoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, THRESHOLD); + // Container 12 (non-zero size) should be chosen, skipping containers 10 and 11 (size 0) - assertNotNull(chosenContainer); - assertEquals(12L, chosenContainer.getContainerID()); - assertEquals(200L * MB, chosenContainer.getBytesUsed()); + assertNotNull(candidate); + assertEquals(12L, candidate.getContainerData().getContainerID()); + assertEquals(200L * MB, candidate.getContainerData().getBytesUsed()); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java index fc1980e579ee..c81bd06a8121 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java @@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,7 +38,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -56,10 +54,9 @@ import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; @@ -210,10 +207,8 @@ public void testPolicyClassInitialization(ContainerTestVersionInfo versionInfo) DiskBalancerServiceTestImpl svc = getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1); - assertTrue(svc.getContainerChoosingPolicy() - instanceof DefaultContainerChoosingPolicy); - assertTrue(svc.getVolumeChoosingPolicy() - instanceof DefaultVolumeChoosingPolicy); + assertTrue(svc.getVolumeContainerChoosingPolicy() + instanceof DefaultVolumeContainerChoosingPolicy); } private String generateVolumeLocation(String base, int volumeCount) { @@ -321,10 +316,8 @@ public void testConcurrentTasksNotExceedThreadLimit() throws Exception { false, DiskBalancerVersion.DEFAULT_VERSION); svc.refresh(info); - DiskBalancerVolumeChoosingPolicy volumePolicy = mock(DiskBalancerVolumeChoosingPolicy.class); - ContainerChoosingPolicy containerPolicy = mock(ContainerChoosingPolicy.class); - svc.setVolumeChoosingPolicy(volumePolicy); - svc.setContainerChoosingPolicy(containerPolicy); + VolumeContainerChoosingPolicy containerPolicy = mock(VolumeContainerChoosingPolicy.class); + svc.setVolumeContainerChoosingPolicy(containerPolicy); List volumes = volumeSet.getVolumesList(); HddsVolume source = (HddsVolume) volumes.get(0); @@ -335,9 +328,8 @@ public void testConcurrentTasksNotExceedThreadLimit() throws Exception { when(containerData.getContainerID()).thenAnswer(invocation -> System.nanoTime()); when(containerData.getBytesUsed()).thenReturn(100L); - when(volumePolicy.chooseVolume(any(), anyDouble(), any(), anyLong())).thenReturn(Pair.of(source, dest)); - when(containerPolicy.chooseContainer(any(), any(), any(), any(), anyDouble(), any(), any())) - .thenReturn(containerData); + when(containerPolicy.chooseVolumesAndContainer(any(), any(), any(), any(), anyDouble())) + .thenReturn(new DiskBalancerVolumeContainerCandidate(containerData, source, dest)); // Test when no tasks are in progress, it should schedule up to the limit BackgroundTaskQueue queue = svc.getTasks(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java index cd53404a670d..7e46a07ab0ea 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java @@ -51,7 +51,6 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -72,7 +71,7 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -267,7 +266,7 @@ public void cleanup() throws IOException { kvFaultInjector.reset(); KeyValueContainer.setInjector(null); DiskBalancerService.setInjector(null); - DefaultContainerChoosingPolicy.setTest(false); + DefaultVolumeContainerChoosingPolicy.setTest(false); } @ParameterizedTest @@ -291,7 +290,7 @@ public void moveSuccess(State containerState) throws IOException { String oldContainerPath = container.getContainerData().getContainerPath(); if (containerState == State.QUASI_CLOSED) { - DefaultContainerChoosingPolicy.setTest(true); + DefaultVolumeContainerChoosingPolicy.setTest(true); } DiskBalancerService.DiskBalancerTask task = getTask(); task.call(); @@ -460,7 +459,7 @@ public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo versionInfo) .when(spyContainerSet).updateContainer(any(Container.class)); when(ozoneContainer.getContainerSet()).thenReturn(spyContainerSet); - DefaultContainerChoosingPolicy.setTest(true); + DefaultVolumeContainerChoosingPolicy.setTest(true); DiskBalancerService.DiskBalancerTask task = getTask(); CompletableFuture completableFuture = CompletableFuture.runAsync(() -> task.call()); @@ -564,11 +563,8 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio GenericTestUtils.LogCapturer serviceLog = GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class); DiskBalancerService.DiskBalancerTask task = getTask(); - long defaultContainerSize = (long) conf.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - // verify committed space is reserved for destination volume - assertEquals(defaultContainerSize, destVolume.getCommittedBytes() - initialDestCommitted); + // verify committed space is reserved for destination volume (uses actual container size) + assertEquals(CONTAINER_SIZE, destVolume.getCommittedBytes() - initialDestCommitted); // delete the container from containerSet to simulate a failure containerSet.removeContainer(CONTAINER_ID); @@ -626,7 +622,7 @@ public void testMoveSkippedWhenContainerStateChanged(State invalidState) throws IOException, InterruptedException, TimeoutException { LogCapturer serviceLog = LogCapturer.captureLogs(DiskBalancerService.class); - // Create a CLOSED container which will be selected by DefaultContainerChoosingPolicy + // Create a CLOSED container which will be selected by DefaultVolumeContainerChoosingPolicy Container container = createContainer(CONTAINER_ID, sourceVolume, State.CLOSED); long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java similarity index 78% rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultVolumeChoosingPolicy.java rename to hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java index ff47a8281ecb..f8097f0c0918 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDefaultVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerVolumeChoosingLogic.java @@ -18,11 +18,14 @@ package org.apache.hadoop.ozone.container.diskbalancer; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; +import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.file.Path; @@ -31,23 +34,36 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.MockSpaceUsageSource; import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.SpaceUsagePersistence; import org.apache.hadoop.hdds.fs.SpaceUsageSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -55,26 +71,31 @@ import org.junit.jupiter.params.provider.MethodSource; /** - * Unit tests for DefaultVolumeChoosingPolicy. + * Unit tests for volume and container selection in DefaultVolumeContainerChoosingPolicy. */ -public class TestDefaultVolumeChoosingPolicy { +public class TestDiskBalancerVolumeChoosingLogic { @TempDir private Path baseDir; + private static final OzoneConfiguration CONF = new OzoneConfiguration(); private static final long MB = 1024L * 1024L; private static final long VOLUME_CAPACITY = 2500L * MB; // 2500MB - same for all volumes private static final long DEFAULT_CONTAINER_SIZE = 100L * MB; // 100MB - private DefaultVolumeChoosingPolicy policy; + private VolumeContainerChoosingPolicy policy; private MutableVolumeSet volumeSet; private String datanodeUuid; private Map deltaMap; + private OzoneContainer ozoneContainer; + private ContainerSet containerSet; + private Set inProgressContainerIDs; @BeforeEach public void setup() { datanodeUuid = UUID.randomUUID().toString(); - policy = new DefaultVolumeChoosingPolicy(new ReentrantLock()); + policy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); deltaMap = new HashMap<>(); + inProgressContainerIDs = new HashSet<>(); } /** @@ -229,12 +250,15 @@ private List createVolumes(List configs) } /** - * Sets up volume set with given volumes. + * Sets up volume set with given volumes and OzoneContainer. + * For "should find pair" scenarios, adds a container on the source volume. * * @param volumes List of volumes to add to volume set + * @param sourceVolume The highest utilization volume (source) - add container here if non-null + * @param containerSize The size of the container to create on the source volume */ - private void setupVolumeSet(List volumes) throws IOException { - // Use a clean configuration to avoid loading default volumes + private void setupVolumeSetAndContainers(List volumes, + HddsVolume sourceVolume, long containerSize) throws IOException { OzoneConfiguration testConf = new OzoneConfiguration(); testConf.set(HDDS_DATANODE_DIR_KEY, baseDir.resolve("defaultVolume").toString()); volumeSet = new MutableVolumeSet(datanodeUuid, testConf, null, @@ -247,6 +271,31 @@ private void setupVolumeSet(List volumes) throws IOException { volumeMap.put(volume.getStorageDir().getAbsolutePath(), volume); } volumeSet.setVolumeMapForTesting(volumeMap); + + containerSet = newContainerSet(); + if (sourceVolume != null) { + createContainer(1L, containerSize, sourceVolume); + } + + ozoneContainer = mock(OzoneContainer.class); + ContainerController controller = new ContainerController(containerSet, null); + when(ozoneContainer.getController()).thenReturn(controller); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + } + + private void createContainer(long id, long usedBytes, HddsVolume vol) + throws IOException { + long maxSize = usedBytes > 0 ? usedBytes : (long) CONF.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + KeyValueContainerData containerData = new KeyValueContainerData(id, + ContainerLayoutVersion.FILE_PER_BLOCK, maxSize, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + containerData.setState(ContainerDataProto.State.CLOSED); + containerData.setVolume(vol); + containerData.getStatistics().setBlockBytesForTesting(usedBytes); + KeyValueContainer container = new KeyValueContainer(containerData, CONF); + containerSet.addContainer(container); } /** @@ -260,7 +309,13 @@ public void testVolumeChoosingPolicy(TestScenario scenario) throws IOException { // Create volumes from configuration List volumes = createVolumes(scenario.getVolumes()); - setupVolumeSet(volumes); + // Source is the highest utilization volume (last in sorted order) + List sortedUsages = getVolumeUsages( + createVolumeSetForUsages(volumes), deltaMap); + sortedUsages.sort(Comparator.comparingDouble(VolumeFixedUsage::getUtilization)); + HddsVolume sourceVolume = scenario.shouldFindPair() + ? sortedUsages.get(sortedUsages.size() - 1).getVolume() : null; + setupVolumeSetAndContainers(volumes, sourceVolume, scenario.getContainerSize()); // Create a map of disk names to volumes for verification Map diskNameToVolume = new HashMap<>(); @@ -272,27 +327,27 @@ public void testVolumeChoosingPolicy(TestScenario scenario) // Get volume usages for verification List volumeUsages = getVolumeUsages(volumeSet, deltaMap); - // Try to find a valid source-destination pair - Pair result = policy.chooseVolume(volumeSet, - scenario.getThresholdPercentage(), deltaMap, scenario.getContainerSize()); + DiskBalancerVolumeContainerCandidate result = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaMap, inProgressContainerIDs, scenario.getThresholdPercentage()); if (scenario.shouldFindPair()) { assertNotNull(result); - assertNotNull(result.getLeft()); - assertNotNull(result.getRight()); + assertNotNull(result.getSourceVolume()); + assertNotNull(result.getDestVolume()); + assertNotNull(result.getContainerData()); // Verify source is the expected disk if (scenario.getExpectedSourceDisk() != null) { HddsVolume expectedSource = diskNameToVolume.get(scenario.getExpectedSourceDisk()); assertNotNull(expectedSource); - assertEquals(expectedSource, result.getLeft()); + assertEquals(expectedSource, result.getSourceVolume()); } // Verify destination is the expected disk (or one of the valid options) if (scenario.getExpectedDestinationDisk() != null) { HddsVolume expectedDest = diskNameToVolume.get(scenario.getExpectedDestinationDisk()); assertNotNull(expectedDest); - assertEquals(expectedDest, result.getRight()); + assertEquals(expectedDest, result.getDestVolume()); } // Filter volumeUsages to only include volumes from our test scenario @@ -313,10 +368,10 @@ public void testVolumeChoosingPolicy(TestScenario scenario) int sourceIndex = -1; int destIndex = -1; for (int i = 0; i < testVolumeUsages.size(); i++) { - if (testVolumeUsages.get(i).getVolume().equals(result.getLeft())) { + if (testVolumeUsages.get(i).getVolume().equals(result.getSourceVolume())) { sourceIndex = i; } - if (testVolumeUsages.get(i).getVolume().equals(result.getRight())) { + if (testVolumeUsages.get(i).getVolume().equals(result.getDestVolume())) { destIndex = i; } } @@ -331,8 +386,7 @@ public void testVolumeChoosingPolicy(TestScenario scenario) double sourceUtilization = testVolumeUsages.get(sourceIndex).getUtilization(); for (int i = 0; i < testVolumeUsages.size(); i++) { if (i != sourceIndex) { - double otherUtilization = testVolumeUsages.get(i).getUtilization(); - assertTrue(sourceUtilization >= otherUtilization); + assertTrue(sourceUtilization >= testVolumeUsages.get(i).getUtilization()); } } } @@ -361,6 +415,19 @@ public void testVolumeChoosingPolicy(TestScenario scenario) } } + private MutableVolumeSet createVolumeSetForUsages(List volumes) throws IOException { + OzoneConfiguration testConf = new OzoneConfiguration(); + testConf.set(HDDS_DATANODE_DIR_KEY, baseDir.resolve("defaultVolume").toString()); + MutableVolumeSet vs = new MutableVolumeSet(datanodeUuid, testConf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + Map volumeMap = new HashMap<>(); + for (HddsVolume volume : volumes) { + volumeMap.put(volume.getStorageDir().getAbsolutePath(), volume); + } + vs.setVolumeMapForTesting(volumeMap); + return vs; + } + /** * Provides test scenarios for parameterized testing. */ @@ -368,23 +435,23 @@ public void testVolumeChoosingPolicy(TestScenario scenario) public static Stream testScenarios() { return Stream.of( // Scenario 1: One volume beyond threshold, no volumes under threshold - // Disk1: 30%, Disk2: 30%, Disk3: 40%, Threshold: 5% - // Ideal: 33.33%, Range: (28.33%, 38.33%), Out of range: Disk3 - // Expected source: Disk3 (highest) at index 2, Expected destination: Disk1 or Disk2 (lowest) at index 0 or 1 + // Disk1: 30%, Disk2: 30.1%, Disk3: 40%, Threshold: 5% + // Ideal: 33.37%, Range: (28.37%, 38.37%), Out of range: Disk3 + // Expected source: Disk3 (highest) at index 2, Expected destination: Disk1 (lowest) at index 0 Arguments.arguments(new TestScenario( "OneVolumeBeyondThresholdNoVolumesUnderThreshold", Arrays.asList( new VolumeTestConfig("disk1", 0.30), // Lowest utilization - index 0 - new VolumeTestConfig("disk2", 0.30), // Same as disk1 - index 1 + new VolumeTestConfig("disk2", 0.301), // Slightly higher - index 1 new VolumeTestConfig("disk3", 0.40) // Highest utilization - index 2 ), 5.0, DEFAULT_CONTAINER_SIZE, true, "disk3", // Expected source (highest) - null, // Destination can be disk1 or disk2 (both valid) so only check source + "disk1", // Destination is disk1 (lowest utilization) 2, // Expected source index (highest utilization) - null // Destination index can be 0 or 1 (both have same utilization) + 0 // Destination index is 0 (lowest utilization) )), // Scenario 2: Volumes both above and below threshold @@ -415,7 +482,7 @@ public static Stream testScenarios() { "AllVolumesWithinThreshold", Arrays.asList( new VolumeTestConfig("disk1", 0.30), // Lowest utilization - new VolumeTestConfig("disk2", 0.30), // Same as disk1 + new VolumeTestConfig("disk2", 0.301), // Slightly higher new VolumeTestConfig("disk3", 0.33) // Highest utilization ), 10.0, @@ -428,43 +495,43 @@ public static Stream testScenarios() { )), // Scenario 4: One volume under threshold, no volumes above threshold - // Disk1: 30%, Disk2: 30%, Disk3: 20% - // Ideal: 26.67%, Range: (21.67%, 31.67%), Out of range: Disk3 - // Expected source: Disk1 or Disk2 (highest) at index 1 or 2, Expected destination: Disk3 (lowest) at index 0 + // Disk1: 30%, Disk2: 30.1%, Disk3: 20% + // Ideal: 26.70%, Range: (21.70%, 31.70%), Out of range: Disk3 + // Expected source: Disk2 (highest) at index 2, Expected destination: Disk3 (lowest) at index 0 Arguments.arguments(new TestScenario( "OneVolumeUnderThresholdNoVolumesAbove", Arrays.asList( new VolumeTestConfig("disk3", 0.20), // Lowest utilization - index 0 new VolumeTestConfig("disk1", 0.30), // Middle utilization - index 1 - new VolumeTestConfig("disk2", 0.30) // Highest utilization (tied with disk1) - index 2 + new VolumeTestConfig("disk2", 0.301) // Highest utilization - index 2 ), 5.0, DEFAULT_CONTAINER_SIZE, true, - null, // Source can be disk1 or disk2 (both valid, highest) + "disk2", // Source is disk2 (highest) "disk3", // Expected destination (lowest) - null, // Source index can be 1 or 2 (both have same utilization) + 2, // Source index is 2 (highest utilization) 0 // Expected destination index (lowest utilization) )), // Scenario 5: Extreme imbalance - one very high, others very low - // Disk1: 95%, Disk2: 5%, Disk3: 5%, Threshold: 10% - // Ideal: 35%, Range: (25%, 45%), Out of range: Disk1, Disk2, Disk3 - // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 or Disk3 (lowest) at index 0 or 1 + // Disk1: 95%, Disk2: 5%, Disk3: 5.1%, Threshold: 10% + // Ideal: 35.03%, Range: (25.03%, 45.03%), Out of range: Disk1, Disk2, Disk3 + // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 (lowest) at index 0 Arguments.arguments(new TestScenario( "ExtremeImbalance", Arrays.asList( new VolumeTestConfig("disk2", 0.05), // Lowest utilization - index 0 - new VolumeTestConfig("disk3", 0.05), // Same as disk2 - index 1 + new VolumeTestConfig("disk3", 0.051), // Slightly higher - index 1 new VolumeTestConfig("disk1", 0.95) // Highest utilization - index 2 ), 10.0, DEFAULT_CONTAINER_SIZE, true, "disk1", // Expected source (highest) - null, // Destination can be disk2 or disk3 (lowest) + "disk2", // Destination is disk2 (lowest) 2, // Expected source index (highest utilization) - null // Destination index can be 0 or 1 (both have same utilization) + 0 // Destination index is 0 (lowest utilization) )), // Scenario 6: Multiple volumes above threshold, one below @@ -508,23 +575,25 @@ public static Stream testScenarios() { )), // Scenario 8: Small threshold with moderate imbalance - // Disk1: 35%, Disk2: 30%, Disk3: 30%, Threshold: 2% - // Ideal: 31.67%, Range: (29.67%, 33.67%), Out of range: Disk1 - // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 or Disk3 (lowest) at index 0 or 1 + // Disk1: 35%, Disk2: 30%, Disk3: 30.1%, Threshold: 2% + // Ideal: 31.70%, Range: (29.70%, 33.70%), Out of range: Disk1 + // Container size must be small enough to fit on disk2 without exceeding threshold + // disk2 after: (750MB + 50MB) / 2500MB = 32% < 33.7% threshold ✓ + // Expected source: Disk1 (highest) at index 2, Expected destination: Disk2 (lowest) at index 0 Arguments.arguments(new TestScenario( "SmallThresholdModerateImbalance", Arrays.asList( new VolumeTestConfig("disk2", 0.30), // Lowest utilization - index 0 - new VolumeTestConfig("disk3", 0.30), // Same as disk2 - index 1 + new VolumeTestConfig("disk3", 0.301), // Slightly higher than disk2 - index 1 new VolumeTestConfig("disk1", 0.35) // Highest utilization - index 2 ), 2.0, - DEFAULT_CONTAINER_SIZE, + 50L * MB, // Small container to fit within tight threshold true, "disk1", // Expected source (highest) - null, // Destination can be disk2 or disk3 (lowest) + "disk2", // Destination is disk2 (lowest) 2, // Expected source index (highest utilization) - null // Destination index can be 0 or 1 (both have same utilization) + 0 // Destination index is 0 (lowest utilization) )), // Scenario 9: Best destination has low utilization but insufficient space diff --git a/hadoop-hdds/docs/content/design/diskbalancer.md b/hadoop-hdds/docs/content/design/diskbalancer.md index aab8af2ff345..d83e99cb2c99 100644 --- a/hadoop-hdds/docs/content/design/diskbalancer.md +++ b/hadoop-hdds/docs/content/design/diskbalancer.md @@ -120,16 +120,11 @@ and balanced state will be visible only after the configured delay, when the sou By default, the DiskBalancer uses specific policies to decide which disks to balance and which containers to move. These are configurable, but the default implementations provide robust and safe behavior. -* **`DefaultVolumeChoosingPolicy`**: This is the default policy for selecting the source and destination volumes. It -identifies the most over-utilized volume as the source and the most under-utilized volume as the destination by comparing -each volume's utilization against the Datanode's average. The calculation is smart enough to account for data that is -already in the process of being moved, ensuring it makes accurate decisions based on the future state of the volumes. - -* **`DefaultContainerChoosingPolicy`**: This is the default policy for selecting which container to move from a source -volume. It iterates through the containers on the source disk and picks the first one that is in a **CLOSED** state -and is not already being moved by another balancing operation. To optimize performance and avoid re-scanning the same -containers repeatedly, it caches the list of containers for each volume which auto expires after one hour of its last -used time or if the container iterator for that is invalidated on full utilisation. +* **`DefaultVolumeContainerChoosingPolicy`**: This is the default policy that consolidates both volume selection and container +selection into a single operation. It identifies the most over-utilized volume as the source and the most under-utilized +volume with sufficient space as the destination, then iterates through containers on the source to pick the first one +that is in a **CLOSED** state and is not already being moved. Consolidating both steps avoids recalculating ideal +utilization and disk usage. It caches the list of containers for each volume which auto expires after one hour. ## Security Design DiskBalancer follows the same security model as other services: diff --git a/hadoop-hdds/docs/content/feature/DiskBalancer.md b/hadoop-hdds/docs/content/feature/DiskBalancer.md index 317d3f023723..acba72ac9509 100644 --- a/hadoop-hdds/docs/content/feature/DiskBalancer.md +++ b/hadoop-hdds/docs/content/feature/DiskBalancer.md @@ -238,17 +238,16 @@ ozone admin datanode diskbalancer report --in-service-datanodes --json The DiskBalancer's behavior can be controlled using the following configuration properties in `ozone-site.xml`. -| Property | Default Value | Description | -|-------------------------------------------------------------|----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `hdds.datanode.disk.balancer.enabled` | `false` | If false, the DiskBalancer service on the Datanode is disabled. Configure it to true for diskBalancer to be enabled. | -| `hdds.datanode.disk.balancer.volume.density.threshold.percent` | `10.0` | A percentage (0-100). A datanode is considered balanced if for each volume, its utilization differs from the average datanode utilization by no more than this threshold. | -| `hdds.datanode.disk.balancer.max.disk.throughputInMBPerSec` | `10` | The maximum bandwidth (in MB/s) that the balancer can use for moving data, to avoid impacting client I/O. | -| `hdds.datanode.disk.balancer.parallel.thread` | `5` | The number of worker threads to use for moving containers in parallel. | -| `hdds.datanode.disk.balancer.service.interval` | `60s` | The time interval at which the Datanode DiskBalancer service checks for imbalance and updates its configuration. | -| `hdds.datanode.disk.balancer.stop.after.disk.even` | `true` | If true, the DiskBalancer will automatically stop its balancing activity once disks are considered balanced (i.e., all volume densities are within the threshold). | +| Property | Default Value | Description | +|-------------------------------------------------------------|----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `hdds.datanode.disk.balancer.enabled` | `false` | If false, the DiskBalancer service on the Datanode is disabled. Configure it to true for diskBalancer to be enabled. | +| `hdds.datanode.disk.balancer.volume.density.threshold.percent` | `10.0` | A percentage (0-100). A datanode is considered balanced if for each volume, its utilization differs from the average datanode utilization by no more than this threshold. | +| `hdds.datanode.disk.balancer.max.disk.throughputInMBPerSec` | `10` | The maximum bandwidth (in MB/s) that the balancer can use for moving data, to avoid impacting client I/O. | +| `hdds.datanode.disk.balancer.parallel.thread` | `5` | The number of worker threads to use for moving containers in parallel. | +| `hdds.datanode.disk.balancer.service.interval` | `60s` | The time interval at which the Datanode DiskBalancer service checks for imbalance and updates its configuration. | +| `hdds.datanode.disk.balancer.stop.after.disk.even` | `true` | If true, the DiskBalancer will automatically stop its balancing activity once disks are considered balanced (i.e., all volume densities are within the threshold). | | `hdds.datanode.disk.balancer.replica.deletion.delay` | `5m` | The delay after a container is successfully moved from source volume to destination volume before the source container replica is deleted. This lazy deletion provides a grace period before failing the read thread holding the old container replica. Unit: ns, ms, s, m, h, d. | -| `hdds.datanode.disk.balancer.volume.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy` | The policy class for selecting source and destination volumes for balancing. | -| `hdds.datanode.disk.balancer.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy` | The policy class for selecting which containers to move from a source volume to destination volume. | -| `hdds.datanode.disk.balancer.service.timeout` | `300s` | Timeout for the Datanode DiskBalancer service operations. | -| `hdds.datanode.disk.balancer.should.run.default` | `false` | If the balancer fails to read its persisted configuration, this value determines if the service should run by default. | +| `hdds.datanode.disk.balancer.volume.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy` | The policy for selecting source/destination volumes and which containers to move. | +| `hdds.datanode.disk.balancer.service.timeout` | `300s` | Timeout for the Datanode DiskBalancer service operations. | +| `hdds.datanode.disk.balancer.should.run.default` | `false` | If the balancer fails to read its persisted configuration, this value determines if the service should run by default. | diff --git a/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md b/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md index e892ee47e7c6..815df78b2e02 100644 --- a/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md +++ b/hadoop-hdds/docs/content/feature/DiskBalancer.zh.md @@ -230,17 +230,16 @@ ozone admin datanode diskbalancer report --in-service-datanodes --json The DiskBalancer's behavior can be controlled using the following configuration properties in `ozone-site.xml`. -| Property | Default Value | Description | -|-------------------------------------------------------------|----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `hdds.datanode.disk.balancer.enabled` | `false` | 如果为 false,则 Datanode 上的 DiskBalancer 服务将被禁用。将其配置为 true 可启用 DiskBalancer。 | | | | -| `hdds.datanode.disk.balancer.volume.density.threshold.percent` | `10.0` | 百分比(0-100)。如果对于每个卷,其利用率与平均数据节点利用率之差不超过此阈值,则认为数据节点处于平衡状态。 | -| `hdds.datanode.disk.balancer.max.disk.throughputInMBPerSec` | `10` | 平衡器可用于移动数据的最大带宽(以 MB/s 为单位),以避免影响客户端 I/O。 | -| `hdds.datanode.disk.balancer.parallel.thread` | `5` | 用于并行移动容器的工作线程数。 | -| `hdds.datanode.disk.balancer.service.interval` | `60s` | Datanode DiskBalancer 服务检查不平衡并更新其配置的时间间隔。 | -| `hdds.datanode.disk.balancer.stop.after.disk.even` | `true` | 如果为真,则一旦磁盘被视为平衡(即所有卷密度都在阈值内),DiskBalancer 将自动停止其平衡活动。 | +| Property | Default Value | Description | +|-------------------------------------------------------------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `hdds.datanode.disk.balancer.enabled` | `false` | 如果为 false,则 Datanode 上的 DiskBalancer 服务将被禁用。将其配置为 true 可启用 DiskBalancer。 | | | | +| `hdds.datanode.disk.balancer.volume.density.threshold.percent` | `10.0` | 百分比(0-100)。如果对于每个卷,其利用率与平均数据节点利用率之差不超过此阈值,则认为数据节点处于平衡状态。 | +| `hdds.datanode.disk.balancer.max.disk.throughputInMBPerSec` | `10` | 平衡器可用于移动数据的最大带宽(以 MB/s 为单位),以避免影响客户端 I/O。 | +| `hdds.datanode.disk.balancer.parallel.thread` | `5` | 用于并行移动容器的工作线程数。 | +| `hdds.datanode.disk.balancer.service.interval` | `60s` | Datanode DiskBalancer 服务检查不平衡并更新其配置的时间间隔。 | +| `hdds.datanode.disk.balancer.stop.after.disk.even` | `true` | 如果为真,则一旦磁盘被视为平衡(即所有卷密度都在阈值内),DiskBalancer 将自动停止其平衡活动。 | | `hdds.datanode.disk.balancer.replica.deletion.delay` | `5m` | 容器成功从源卷移动到目标卷后,源容器副本被删除前的延迟时间。这种延迟删除机制旨在避免旧副本的即时删除导致持有旧容器副本的线程数据读取失败。单位:ns、ms、s、m、h、d。| -| `hdds.datanode.disk.balancer.volume.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy` | 用于选择平衡的源卷和目标卷的策略类。 | -| `hdds.datanode.disk.balancer.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy` | 用于选择将哪些容器从源卷移动到目标卷的策略类。 | -| `hdds.datanode.disk.balancer.service.timeout` | `300s` | Datanode DiskBalancer 服务操作超时。 | -| `hdds.datanode.disk.balancer.should.run.default` | `false` | 如果平衡器无法读取其持久配置,则该值决定服务是否应默认运行。 | +| `hdds.datanode.disk.balancer.volume.container.choosing.policy` | `org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy` | 用于选择源/目标卷以及要移动的容器的策略。 | +| `hdds.datanode.disk.balancer.service.timeout` | `300s` | Datanode DiskBalancer 服务操作超时。 | +| `hdds.datanode.disk.balancer.should.run.default` | `false` | 如果平衡器无法读取其持久配置,则该值决定服务是否应默认运行。 | diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPerformance.java similarity index 69% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPerformance.java index b9d5989fcab8..5b32607d0d38 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPerformance.java @@ -18,6 +18,9 @@ package org.apache.hadoop.ozone.scm.node; import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.newVolumeFixedUsage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -30,7 +33,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -41,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; @@ -52,15 +58,14 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.common.impl.ContainerDataScanOrder; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -73,9 +78,9 @@ import org.junit.jupiter.api.io.TempDir; /** - * This class tests the ContainerChoosingPolicy. + * This class tests the container selection logic performance. */ -public class TestContainerChoosingPolicy { +public class TestContainerChoosingPerformance { private static final int NUM_VOLUMES = 20; private static final int NUM_CONTAINERS = 100000; @@ -92,15 +97,17 @@ public class TestContainerChoosingPolicy { private List volumes; private ContainerSet containerSet; private OzoneContainer ozoneContainer; - private ContainerChoosingPolicy containerChoosingPolicy; + private DefaultVolumeContainerChoosingPolicy volumeContainerChoosingPolicy; private ExecutorService executor; private MutableVolumeSet volumeSet; // Simulate containers currently being balanced (in progress) private Set inProgressContainerIDs = ConcurrentHashMap.newKeySet(); + private Map deltaMap = new ConcurrentHashMap<>(); @BeforeEach public void setup() throws Exception { + CONF.set("hdds.datanode.volume.min.free.space", "10MB"); containerSet = newContainerSet(); createVolumes(); createContainers(); @@ -108,7 +115,7 @@ public void setup() throws Exception { ContainerController containerController = new ContainerController(containerSet, null); when(ozoneContainer.getController()).thenReturn(containerController); when(ozoneContainer.getContainerSet()).thenReturn(containerSet); - containerChoosingPolicy = new DefaultContainerChoosingPolicy(); + volumeContainerChoosingPolicy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); executor = Executors.newFixedThreadPool(NUM_THREADS); // Create a spied MutableVolumeSet and inject the test volumes @@ -138,46 +145,52 @@ public void cleanUp() { @Test @Timeout(300) public void testConcurrentContainerChoosingPerformance() throws Exception { - testPolicyPerformance("ContainerChoosingPolicy", containerChoosingPolicy); + testContainerSelectionPerformance(); } @Test public void testContainerDeletionAfterIteratorGeneration() throws Exception { - HddsVolume volume = volumes.get(0); - HddsVolume destVolume = volumes.get(1); - - List> containerList = ozoneContainer.getContainerSet().getContainerMap().values().stream() - .filter(x -> volume.getStorageID().equals(x.getContainerData().getVolume().getStorageID())) - .filter(x -> x.getContainerData().isClosed()) - .sorted(ContainerDataScanOrder.INSTANCE) - .collect(Collectors.toList()); + // Test that chooseContainer skips in-progress containers and handles deleted containers. inProgressContainerIDs.clear(); - ContainerData container = containerChoosingPolicy.chooseContainer(ozoneContainer, volume, destVolume, - inProgressContainerIDs, THRESHOLD, volumeSet, null); - assertNotNull(container); - assertEquals(containerList.get(0).getContainerData().getContainerID(), container.getContainerID()); + ChooseContainerContext ctx = getChooseContainerContext(); + + // Get source containers in policy order + List> policyOrder = getSourceContainersInPolicyOrder(ctx.srcVolume); + + // choose first container + ContainerData first = volumeContainerChoosingPolicy.chooseContainer(ozoneContainer, ctx.srcVolume, ctx.dstVolume, + ctx.dstUsage, inProgressContainerIDs, ctx.upperThreshold); + assertNotNull(first, "Expected to choose a container on first call"); + assertEquals(policyOrder.get(0).getContainerData().getContainerID(), first.getContainerID()); + + // Mark first as in-progress and remove second from containerSet + inProgressContainerIDs.add(ContainerID.valueOf(first.getContainerID())); + ozoneContainer.getContainerSet().removeContainer(policyOrder.get(1).getContainerData().getContainerID()); - ozoneContainer.getContainerSet().removeContainer(containerList.get(1).getContainerData().getContainerID()); - inProgressContainerIDs.add(ContainerID.valueOf(container.getContainerID())); - container = containerChoosingPolicy.chooseContainer(ozoneContainer, volume, - destVolume, inProgressContainerIDs, THRESHOLD, volumeSet, null); - assertEquals(containerList.get(1).getContainerData().getContainerID(), container.getContainerID()); + // Second call: policy skips get(0) (in-progress) and get(1) (deleted), returns get(2) + ContainerData secondContainer = volumeContainerChoosingPolicy. + chooseContainer(ozoneContainer, ctx.srcVolume, ctx.dstVolume, + ctx.dstUsage, inProgressContainerIDs, ctx.upperThreshold); + assertNotNull(secondContainer, "Expected to choose a container on second call"); + assertEquals(policyOrder.get(2).getContainerData().getContainerID(), secondContainer.getContainerID()); } /** - * SuccessCount: Number of successful container choices from the policy. - * FailureCount: Failures due to any exceptions thrown during container choice. + * Tests container selection performance using chooseContainer directly. */ - private void testPolicyPerformance(String policyName, ContainerChoosingPolicy policy) throws Exception { + private void testContainerSelectionPerformance() throws Exception { + inProgressContainerIDs.clear(); + deltaMap.clear(); + + ChooseContainerContext ctx = getChooseContainerContext(); + CountDownLatch latch = new CountDownLatch(NUM_THREADS); AtomicInteger containerChosenCount = new AtomicInteger(0); AtomicInteger containerNotChosenCount = new AtomicInteger(0); AtomicInteger failureCount = new AtomicInteger(0); AtomicLong totalTimeNanos = new AtomicLong(0); - Random rand = new Random(); - for (int i = 0; i < NUM_THREADS; i++) { executor.submit(() -> { try { @@ -185,18 +198,11 @@ private void testPolicyPerformance(String policyName, ContainerChoosingPolicy po int containerChosen = 0; int containerNotChosen = 0; int failures = 0; - // Choose a random volume - HddsVolume srcVolume = volumes.get(rand.nextInt(NUM_VOLUMES)); - HddsVolume destVolume; - - do { - destVolume = volumes.get(rand.nextInt(NUM_VOLUMES)); - } while (srcVolume.equals(destVolume)); for (int j = 0; j < NUM_ITERATIONS; j++) { try { - ContainerData c = policy.chooseContainer(ozoneContainer, srcVolume, - destVolume, inProgressContainerIDs, THRESHOLD, volumeSet, null); + ContainerData c = volumeContainerChoosingPolicy.chooseContainer(ozoneContainer, + ctx.srcVolume, ctx.dstVolume, ctx.dstUsage, inProgressContainerIDs, ctx.upperThreshold); if (c == null) { containerNotChosen++; } else { @@ -228,7 +234,7 @@ private void testPolicyPerformance(String policyName, ContainerChoosingPolicy po double avgTimePerOp = (double) totalTimeNanos.get() / totalOperations; double opsPerSec = totalOperations / (totalTimeNanos.get() / 1_000_000_000.0); - System.out.println("Performance results for " + policyName); + System.out.println("Container selection performance results:"); System.out.println("Total volumes: " + NUM_VOLUMES); System.out.println("Total containers: " + NUM_CONTAINERS); System.out.println("Total threads: " + NUM_THREADS); @@ -239,6 +245,42 @@ private void testPolicyPerformance(String policyName, ContainerChoosingPolicy po System.out.println("Total time (ms): " + totalTimeNanos.get() / 1_000_000); System.out.println("Average time per operation (ns): " + avgTimePerOp); System.out.println("Operations per second: " + opsPerSec); + assertTrue(containerChosenCount.get() > 0, "Expected at least some containers to be chosen"); + } + + private ChooseContainerContext getChooseContainerContext() { + List volumeUsages = getVolumeUsages(volumeSet, deltaMap); + volumeUsages.sort(Comparator.comparingDouble(VolumeFixedUsage::getUtilization) + .thenComparing(v -> v.getVolume().getStorageID())); + double idealUsage = getIdealUsage(volumeUsages); + double upperThreshold = idealUsage + THRESHOLD / 100.0; + HddsVolume srcVolume = volumeUsages.get(volumeUsages.size() - 1).getVolume(); + HddsVolume dstVolume = volumeUsages.get(0).getVolume(); + VolumeFixedUsage dstUsage = newVolumeFixedUsage(dstVolume, deltaMap); + return new ChooseContainerContext(srcVolume, dstVolume, dstUsage, upperThreshold); + } + + private List> getSourceContainersInPolicyOrder(HddsVolume srcVolume) { + List> list = new ArrayList<>(); + ozoneContainer.getController().getContainers(srcVolume).forEachRemaining(list::add); + return list.stream() + .filter(c -> c.getContainerData().isClosed() && c.getContainerData().getBytesUsed() > 0) + .collect(Collectors.toList()); + } + + private static final class ChooseContainerContext { + private final HddsVolume srcVolume; + private final HddsVolume dstVolume; + private final VolumeFixedUsage dstUsage; + private final double upperThreshold; + + ChooseContainerContext(HddsVolume srcVolume, HddsVolume dstVolume, + VolumeFixedUsage dstUsage, double upperThreshold) { + this.srcVolume = srcVolume; + this.dstVolume = dstVolume; + this.dstUsage = dstUsage; + this.upperThreshold = upperThreshold; + } } public void createVolumes() throws IOException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java similarity index 76% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java index cdbe046635c3..ee501da18625 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPolicy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestVolumeChoosingPerformance.java @@ -17,10 +17,12 @@ package org.apache.hadoop.ozone.scm.node; +import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.file.Files; @@ -30,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -39,20 +42,29 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.MockSpaceUsageSource; import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.SpaceUsagePersistence; import org.apache.hadoop.hdds.fs.SpaceUsageSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeContainerCandidate; +import org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -62,13 +74,12 @@ /** * This class tests the VolumeChoosingPolicy. */ -public class TestVolumeChoosingPolicy { +public class TestVolumeChoosingPerformance { private static final int NUM_VOLUMES = 20; private static final int NUM_THREADS = 10; private static final int NUM_ITERATIONS = 10000; private static final double THRESHOLD = 10; // 10% threshold - private static final long DEFAULT_CONTAINER_SIZE = 100L * 1024 * 1024; // 100MB private OzoneConfiguration conf; @@ -77,19 +88,21 @@ public class TestVolumeChoosingPolicy { private MutableVolumeSet volumeSet; private List hddsVolumes; - private DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy; + private VolumeContainerChoosingPolicy volumeContainerChoosingPolicy; + private OzoneContainer ozoneContainer; private ExecutorService executor; // delta sizes for source volumes private Map deltaSizes = new ConcurrentHashMap<>(); + private Set inProgressContainerIDs = ConcurrentHashMap.newKeySet(); @BeforeEach public void setup() throws Exception { - // Create a fresh configuration for each test to avoid interference between tests conf = new OzoneConfiguration(); hddsVolumes = new ArrayList<>(); createVolumes(); - volumeChoosingPolicy = new DefaultVolumeChoosingPolicy(new ReentrantLock()); + createContainers(); + volumeContainerChoosingPolicy = new DefaultVolumeContainerChoosingPolicy(new ReentrantLock()); executor = Executors.newFixedThreadPool(NUM_THREADS); } @@ -124,21 +137,22 @@ public void testVolumeChoosingFailureDueToDiskFull() { for (StorageVolume volume: volumeSet.getVolumesList()) { volume.setConf(testConf); } - assertNull(volumeChoosingPolicy.chooseVolume(volumeSet, THRESHOLD, deltaSizes, DEFAULT_CONTAINER_SIZE)); + assertNull(volumeContainerChoosingPolicy.chooseVolumesAndContainer(ozoneContainer, volumeSet, + deltaSizes, inProgressContainerIDs, THRESHOLD)); assertEquals(NUM_VOLUMES, volumeSet.getVolumesList().size()); } @Test @Timeout(300) public void testConcurrentVolumeChoosingPerformance() throws Exception { - testPolicyPerformance("DefaultVolumeChoosingPolicy", volumeChoosingPolicy); + testPolicyPerformance("VolumeSelectionLogic", volumeContainerChoosingPolicy); } /** - * pairChosenCount: Number of successful volume pair choices from the policy. - * FailureCount: Failures due to any exceptions thrown during volume choice or null return. + * pairChosenCount: Number of successful container/volume choices from the policy. + * FailureCount: Failures due to any exceptions thrown during choice or null return. */ - private void testPolicyPerformance(String policyName, DiskBalancerVolumeChoosingPolicy policy) throws Exception { + private void testPolicyPerformance(String policyName, VolumeContainerChoosingPolicy policy) throws Exception { CountDownLatch latch = new CountDownLatch(NUM_THREADS); AtomicInteger pairChosenCount = new AtomicInteger(0); AtomicInteger pairNotChosenCount = new AtomicInteger(0); @@ -179,23 +193,22 @@ private void testPolicyPerformance(String policyName, DiskBalancerVolumeChoosing long threadStart = System.nanoTime(); try { - // Use a reasonable container size for the policy check - // The policy checks if containerSize < computeUsableSpace() - Pair pair = policy.chooseVolume( - volumeSet, THRESHOLD, deltaSizes, DEFAULT_CONTAINER_SIZE); + DiskBalancerVolumeContainerCandidate candidate = policy.chooseVolumesAndContainer(ozoneContainer, + volumeSet, deltaSizes, inProgressContainerIDs, THRESHOLD); - if (pair == null) { + if (candidate == null) { volumeNotChosen++; } else { volumeChosen++; - HddsVolume sourceVolume = pair.getLeft(); + HddsVolume sourceVolume = candidate.getSourceVolume(); // The policy automatically calls destVolume.incCommittedBytes(containerSize) // when a pair is chosen, so we need to simulate the corresponding // deltaMap update for the source volume (negative value = space to be freed) // This aligns with how DiskBalancerService updates deltaSizes after choosing - deltaSizes.compute(sourceVolume, (k, v) -> (v == null ? 0L : v) - DEFAULT_CONTAINER_SIZE); - + deltaSizes.compute(sourceVolume, (k, v) -> (v == null ? 0L : v) + - candidate.getContainerData().getBytesUsed()); + // Note: In a real DiskBalancerService, after a successful container move, // the committed bytes on the destination would be adjusted and the delta // on the source would be cleared. For this performance test, we simulate @@ -282,4 +295,30 @@ private void createVolumes() throws IOException { System.out.println("Created " + NUM_VOLUMES + " volumes in " + (System.currentTimeMillis() - startTime) + " ms"); } + + private void createContainers() { + ContainerSet containerSet = newContainerSet(); + Random random = new Random(); + for (int i = 0; i < NUM_VOLUMES * 10; i++) { + HddsVolume volume = hddsVolumes.get(i % NUM_VOLUMES); + try { + long id = 1000L + i; + long bytesUsed = (random.nextInt(50) + 1) * 1024 * 1024L; + KeyValueContainerData containerData = new KeyValueContainerData(id, + ContainerLayoutVersion.FILE_PER_BLOCK, ContainerTestHelper.CONTAINER_MAX_SIZE, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + containerData.setState(ContainerDataProto.State.CLOSED); + containerData.setVolume(volume); + containerData.getStatistics().setBlockBytesForTesting(bytesUsed); + KeyValueContainer container = new KeyValueContainer(containerData, conf); + containerSet.addContainer(container); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + ozoneContainer = mock(OzoneContainer.class); + ContainerController controller = new ContainerController(containerSet, null); + when(ozoneContainer.getController()).thenReturn(controller); + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + } }