Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
Expand All @@ -42,6 +43,9 @@ public abstract class AbstractDiskBalancerSubCommand implements Callable<Void> {
// Track if we're in batch mode to run commands on all in-service datanodes
private boolean isBatchMode = false;

// Pre-fetched datanode address names for batch mode (address -> "hostname (ip:port)"); null in non-batch
private Map<String, String> datanodeDisplayNames = null;

@Override
public Void call() throws Exception {
// Check if DiskBalancer is enabled in configuration
Expand Down Expand Up @@ -74,6 +78,11 @@ public Void call() throws Exception {
return null;
}

// Remove duplicates while preserving order
List<String> deduplicatedDatanodes = targetDatanodes.stream()
.distinct()
.collect(Collectors.toList());

// Track if we're using batch mode for display
isBatchMode = options.isInServiceDatanodes();

Expand All @@ -83,7 +92,7 @@ public Void call() throws Exception {
List<Object> jsonResults = new ArrayList<>();

// Execute commands and collect results
for (String dn : targetDatanodes) {
for (String dn : deduplicatedDatanodes) {
try {
Object result = executeCommand(dn);
successNodes.add(dn);
Expand Down Expand Up @@ -145,6 +154,7 @@ private List<String> getTargetDatanodes() {
if (options.isInServiceDatanodes()) {
return getAllInServiceDatanodes();
} else {
datanodeDisplayNames = null; // Non-batch: use user input as-is, no SCM for formatting
return options.getDatanodes();
}
}
Expand All @@ -154,9 +164,10 @@ private List<String> getTargetDatanodes() {
*/
private List<String> getAllInServiceDatanodes() {
try (ScmClient scmClient = new ContainerOperationClient(new OzoneConfiguration())) {
return DiskBalancerSubCommandUtil.getAllOperableNodesClientRpcAddress(scmClient);
datanodeDisplayNames = DiskBalancerSubCommandUtil.getAllOperableNodesClientRpcAddress(scmClient);
return new ArrayList<>(datanodeDisplayNames.keySet());
} catch (IOException e) {
System.err.println("Error querying SCM for in-service datanodes: %n" + e.getMessage());
System.err.printf("Error querying SCM for in-service datanodes. %n%s%n", e.getMessage());
return null;
}
}
Expand Down Expand Up @@ -220,7 +231,9 @@ protected Map<String, Object> getConfigurationMap() {
*/
private Map<String, Object> createErrorResult(String datanode, String errorMsg) {
Map<String, Object> errorResult = new LinkedHashMap<>();
errorResult.put("datanode", datanode);
// Format datanode string with hostname if available
String formattedDatanode = formatDatanodeDisplayName(datanode);
errorResult.put("datanode", formattedDatanode);
errorResult.put("action", getActionName());
errorResult.put("status", "failure");
errorResult.put("errorMsg", errorMsg);
Expand All @@ -233,5 +246,20 @@ private Map<String, Object> createErrorResult(String datanode, String errorMsg)

return errorResult;
}

/**
* Format a datanode address for display.
* In batch mode, uses pre-fetched display names from the SCM query.
* In non-batch mode, returns the user's input as-is (no SCM call).
*
* @param address the datanode address in "ip:port" format
* @return formatted string "hostname (ip:port)" in batch mode, or address as-is in non-batch
*/
protected String formatDatanodeDisplayName(String address) {
if (datanodeDisplayNames != null) {
return datanodeDisplayNames.getOrDefault(address, address);
}
return address;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.hdds.scm.cli.datanode;

import static java.util.stream.Collectors.toList;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -72,8 +74,10 @@ protected void displayResults(List<String> successNodes, List<String> failedNode

// Display error messages for failed nodes
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to get DiskBalancer report from nodes: [%s]%n",
String.join(", ", failedNodes));
System.err.printf("Failed to get DiskBalancer report from nodes: [%s]%n",
String.join(", ", failedNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
}

// Display consolidated report for successful nodes
Expand All @@ -91,15 +95,20 @@ private String generateReport(List<DatanodeDiskBalancerInfoProto> protos) {
Double.compare(b.getCurrentVolumeDensitySum(), a.getCurrentVolumeDensitySum()));

StringBuilder formatBuilder = new StringBuilder("Report result:%n" +
"%-50s %s%n");
"%-60s %s%n");

List<String> contentList = new ArrayList<>();
contentList.add("Datanode");
contentList.add("VolumeDensity");

for (DatanodeDiskBalancerInfoProto proto : sortedProtos) {
formatBuilder.append("%-50s %s%n");
contentList.add(proto.getNode().getHostName());
formatBuilder.append("%-60s %s%n");
// Format datanode string with hostname and IP address
String[] hostnameIpPort = DiskBalancerSubCommandUtil.extractHostIpAndPort(
proto.getNode());
String formattedDatanode = DiskBalancerSubCommandUtil.getDatanodeHostAndIp(
hostnameIpPort[0], hostnameIpPort[1], Integer.parseInt(hostnameIpPort[2]));
contentList.add(formattedDatanode);
contentList.add(String.valueOf(proto.getCurrentVolumeDensitySum()));
}

Expand All @@ -121,7 +130,12 @@ protected String getActionName() {
private Map<String, Object> createReportResult(
HddsProtos.DatanodeDiskBalancerInfoProto report) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("datanode", report.getNode().getHostName());
// Format datanode string with hostname and IP address
String[] hostnameIpPort = DiskBalancerSubCommandUtil.extractHostIpAndPort(
report.getNode());
String formattedDatanode = DiskBalancerSubCommandUtil.getDatanodeHostAndIp(
hostnameIpPort[0], hostnameIpPort[1], Integer.parseInt(hostnameIpPort[2]));
result.put("datanode", formattedDatanode);
result.put("action", "report");
result.put("status", "success");
result.put("volumeDensity", report.getCurrentVolumeDensitySum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.hdds.scm.cli.datanode;

import static java.util.stream.Collectors.toList;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -65,7 +67,9 @@ protected Object executeCommand(String hostName) throws IOException {
diskBalancerProxy.startDiskBalancer(config);

Map<String, Object> result = new LinkedHashMap<>();
result.put("datanode", hostName);
// Format datanode string with hostname if available
String formattedDatanode = formatDatanodeDisplayName(hostName);
result.put("datanode", formattedDatanode);
result.put("action", "start");
result.put("status", "success");
Map<String, Object> configMap = getConfigurationMap();
Expand Down Expand Up @@ -107,19 +111,25 @@ protected void displayResults(List<String> successNodes,
if (isBatchMode()) {
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to start DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));
String.join(", ", failedNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
} else {
System.out.println("Started DiskBalancer on all IN_SERVICE nodes.");
}
} else {
// Detailed message for specific nodes
if (!successNodes.isEmpty()) {
System.out.printf("Started DiskBalancer on nodes: [%s]%n",
String.join(", ", successNodes));
String.join(", ", successNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
}
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to start DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));
String.join(", ", failedNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.hdds.scm.cli.datanode;

import static java.util.stream.Collectors.toList;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -72,8 +74,10 @@ protected void displayResults(List<String> successNodes, List<String> failedNode

// Display error messages for failed nodes
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to get DiskBalancer status from nodes: [%s]%n",
String.join(", ", failedNodes));
System.err.printf("Failed to get DiskBalancer status from nodes: [%s]%n",
String.join(", ", failedNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
}

// Display consolidated status for successful nodes
Expand All @@ -86,7 +90,7 @@ protected void displayResults(List<String> successNodes, List<String> failedNode

private String generateStatus(List<DatanodeDiskBalancerInfoProto> protos) {
StringBuilder formatBuilder = new StringBuilder("Status result:%n" +
"%-35s %-15s %-15s %-15s %-12s %-20s %-12s %-12s %-15s %-18s %-20s%n");
"%-60s %-12s %-15s %-15s %-12s %-20s %-12s %-12s %-15s %-18s %-20s%n");

List<String> contentList = new ArrayList<>();
contentList.add("Datanode");
Expand All @@ -102,12 +106,17 @@ private String generateStatus(List<DatanodeDiskBalancerInfoProto> protos) {
contentList.add("EstTimeLeft(min)");

for (HddsProtos.DatanodeDiskBalancerInfoProto proto : protos) {
formatBuilder.append("%-35s %-15s %-15s %-15s %-12s %-20s %-12s %-12s %-15s %-18s %-20s%n");
formatBuilder.append("%-60s %-12s %-15s %-15s %-12s %-20s %-12s %-12s %-15s %-18s %-20s%n");
long estimatedTimeLeft = calculateEstimatedTimeLeft(proto);
long bytesMovedMB = (long) Math.ceil(proto.getBytesMoved() / (1024.0 * 1024.0));
long bytesToMoveMB = (long) Math.ceil(proto.getBytesToMove() / (1024.0 * 1024.0));

contentList.add(proto.getNode().getHostName());
// Format datanode string with hostname and IP address
String[] hostnameIpPort = DiskBalancerSubCommandUtil.extractHostIpAndPort(
proto.getNode());
String formattedDatanode = DiskBalancerSubCommandUtil.getDatanodeHostAndIp(
hostnameIpPort[0], hostnameIpPort[1], Integer.parseInt(hostnameIpPort[2]));
contentList.add(formattedDatanode);
contentList.add(proto.getRunningStatus().name());
contentList.add(
String.format("%.4f", proto.getDiskBalancerConf().getThreshold()));
Expand Down Expand Up @@ -149,7 +158,12 @@ protected String getActionName() {
*/
private Map<String, Object> createStatusResult(DatanodeDiskBalancerInfoProto status) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("datanode", status.getNode().getHostName());
// Format datanode string with hostname and IP address
String[] hostnameIpPort = DiskBalancerSubCommandUtil.extractHostIpAndPort(
status.getNode());
String formattedDatanode = DiskBalancerSubCommandUtil.getDatanodeHostAndIp(
hostnameIpPort[0], hostnameIpPort[1], Integer.parseInt(hostnameIpPort[2]));
result.put("datanode", formattedDatanode);
result.put("action", "status");
result.put("status", "success");
result.put("serviceStatus", status.getRunningStatus().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.hdds.scm.cli.datanode;

import static java.util.stream.Collectors.toList;

import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -41,7 +43,9 @@ protected Object executeCommand(String hostName) throws IOException {
try {
diskBalancerProxy.stopDiskBalancer();
Map<String, Object> result = new java.util.LinkedHashMap<>();
result.put("datanode", hostName);
// Format datanode string with hostname if available
String formattedDatanode = formatDatanodeDisplayName(hostName);
result.put("datanode", formattedDatanode);
result.put("action", "stop");
result.put("status", "success");
return result;
Expand All @@ -61,19 +65,25 @@ protected void displayResults(List<String> successNodes, List<String> failedNode
// Simpler message for batch mode
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to stop DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));
String.join(", ", failedNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
} else {
System.out.println("Stopped DiskBalancer on all IN_SERVICE nodes.");
}
} else {
// Detailed message for specific nodes
if (!successNodes.isEmpty()) {
System.out.printf("Stopped DiskBalancer on nodes: [%s]%n",
String.join(", ", successNodes));
String.join(", ", successNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
}
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to stop DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));
String.join(", ", failedNodes.stream()
.map(this::formatDatanodeDisplayName)
.collect(toList())));
}
}
}
Expand Down
Loading