Merge remote-tracking branch 'origin/HDDS-3630'
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index 9fc0784..5bec415 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -52,6 +52,12 @@
* <p>{@literal ../hdds/<<clusterUuid>>/current/<<containerDir>>/<<containerID
* >>/<<dataDir>>}
* <p>
+ * Each hdds volume has its own VERSION file. The hdds volume will have one
+ * clusterUuid directory for each SCM it is a part of (currently only one SCM is
+ * supported).
+ * <p>
+ * During DN startup, if the VERSION file exists, we verify that the
+ * clusterID in the version file matches the clusterID from SCM.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -63,9 +69,12 @@
public static final String HDDS_VOLUME_DIR = "hdds";
private final VolumeIOStats volumeIOStats;
+ private final VolumeInfoMetrics volumeInfoMetrics;
private final AtomicLong committedBytes; // till Open containers become full
+ // Mentions the type of volume
+ private final VolumeType type = VolumeType.DATA_VOLUME;
// The dedicated DbVolume that the db instance of this HddsVolume resides.
// This is optional, if null then the db instance resides on this HddsVolume.
private DbVolume dbVolume;
@@ -98,7 +107,10 @@
super(b);
if (!b.getFailedVolume()) {
+ this.setState(VolumeState.NOT_INITIALIZED);
this.volumeIOStats = new VolumeIOStats(b.getVolumeRootStr());
+ this.volumeInfoMetrics =
+ new VolumeInfoMetrics(b.getVolumeRootStr(), this);
this.committedBytes = new AtomicLong(0);
LOG.info("Creating HddsVolume: {} of storage type : {} capacity : {}",
@@ -108,7 +120,9 @@
} else {
// Builder is called with failedVolume set, so create a failed volume
// HddsVolume Object.
+ this.setState(VolumeState.FAILED);
volumeIOStats = null;
+ volumeInfoMetrics = null;
committedBytes = null;
}
@@ -130,16 +144,27 @@
return super.getStorageDir();
}
+ public VolumeType getType() {
+ return type;
+ }
+
public VolumeIOStats getVolumeIOStats() {
return volumeIOStats;
}
+ public VolumeInfoMetrics getVolumeInfoStats() {
+ return volumeInfoMetrics;
+ }
+
@Override
public void failVolume() {
super.failVolume();
if (volumeIOStats != null) {
volumeIOStats.unregister();
}
+ if (volumeInfoMetrics != null) {
+ volumeInfoMetrics.unregister();
+ }
closeDbStore();
}
@@ -149,11 +174,15 @@
if (volumeIOStats != null) {
volumeIOStats.unregister();
}
+ if (volumeInfoMetrics != null) {
+ volumeInfoMetrics.unregister();
+ }
closeDbStore();
}
/**
* add "delta" bytes to committed space in the volume.
+ *
* @param delta bytes to add to committed space counter
* @return bytes of committed space
*/
@@ -163,6 +192,7 @@
/**
* return the committed space in the volume.
+ *
* @return bytes of committed space
*/
public long getCommittedBytes() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
index 360c2c7..c5b399b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
@@ -27,10 +27,16 @@
*/
public class MetadataVolume extends StorageVolume {
+ private final VolumeType type = VolumeType.META_VOLUME;
+
protected MetadataVolume(Builder b) throws IOException {
super(b);
}
+ public VolumeType getType() {
+ return type;
+ }
+
/**
* Builder class for MetadataVolume.
*/
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfoMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfoMetrics.java
new file mode 100644
index 0000000..21b9d2a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfoMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * This class is used to track Volume Info stats for each HDDS Volume.
+ */
+@Metrics(about = "Ozone Volume Information Metrics",
+ context = OzoneConsts.OZONE)
+public class VolumeInfoMetrics {
+
+ private String metricsSourceName = VolumeInfoMetrics.class.getSimpleName();
+ private String volumeRootStr;
+ private HddsVolume volume;
+
+ /**
+ * @param identifier Typically, path to volume root. e.g. /data/hdds
+ */
+ public VolumeInfoMetrics(String identifier, HddsVolume ref) {
+ this.metricsSourceName += '-' + identifier;
+ this.volumeRootStr = identifier;
+ this.volume = ref;
+ init();
+ }
+
+ public void init() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.register(metricsSourceName, "Volume Info Statistics", this);
+ }
+
+ public void unregister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(metricsSourceName);
+ }
+
+ @Metric("Metric to return the Storage Type")
+ public String getStorageType() {
+ return volume.getStorageType().toString();
+ }
+
+ @Metric("Returns the Directory name for the volume")
+ public String getStorageDirectory() {
+ return volume.getStorageDir().toString();
+ }
+
+ @Metric("Return the DataNode UID for the respective volume")
+ public String getDatanodeUuid() {
+ return volume.getDatanodeUuid();
+ }
+
+ @Metric("Return the Layout Version for the volume")
+ public int getLayoutVersion() {
+ return volume.getLayoutVersion();
+ }
+
+ @Metric("Returns the Volume Type")
+ public String getVolumeType() {
+ return volume.getType().name();
+ }
+
+ public String getMetricsSourceName() {
+ return metricsSourceName;
+ }
+
+ /**
+ * Test conservative avail space.
+ * |----used----| (avail) |++++++++reserved++++++++|
+ * |<------- capacity ------->|
+ * |<------------------- Total capacity -------------->|
+ * A) avail = capacity - used
+ * B) capacity = used + avail
+ * C) Total capacity = used + avail + reserved
+ */
+
+ /**
+ * Return the Storage type for the Volume.
+ */
+ @Metric("Returns the Used space")
+ public long getUsed() {
+ return volume.getVolumeInfo().getScmUsed();
+ }
+
+ /**
+ * Return the Total Available capacity of the Volume.
+ */
+ @Metric("Returns the Available space")
+ public long getAvailable() {
+ return volume.getVolumeInfo().getAvailable();
+ }
+
+ /**
+ * Return the Total Reserved of the Volume.
+ */
+ @Metric("Fetches the Reserved Space")
+ public long getReserved() {
+ return volume.getVolumeInfo().getReservedInBytes();
+ }
+
+ /**
+ * Return the Total capacity of the Volume.
+ */
+ @Metric("Returns the Capacity of the Volume")
+ public long getCapacity() {
+ return getUsed() + getAvailable();
+ }
+
+ /**
+ * Return the Total capacity of the Volume.
+ */
+ @Metric("Returns the Total Capacity of the Volume")
+ public long getTotalCapacity() {
+ return (getUsed() + getAvailable() + getReserved());
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
index 8c3b62b..e98a29f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
@@ -33,14 +33,10 @@
private final RocksDatabase db;
private final ColumnFamily family;
- private IngestExternalFileOptions ingestOptions;
-
public RDBSstFileLoader(RocksDatabase db, ColumnFamily cf) {
this.db = db;
this.family = cf;
- this.ingestOptions = new IngestExternalFileOptions()
- .setIngestBehind(false);
}
@Override
@@ -49,16 +45,18 @@
if (externalFile.length() == 0) {
return;
}
- db.ingestExternalFile(family,
- Collections.singletonList(externalFile.getAbsolutePath()),
- ingestOptions);
+ IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()
+ .setIngestBehind(false);
+ try {
+ db.ingestExternalFile(family,
+ Collections.singletonList(externalFile.getAbsolutePath()),
+ ingestOptions);
+ } finally {
+ ingestOptions.close();
+ }
}
@Override
public void close() {
- if (ingestOptions != null) {
- ingestOptions.close();
- ingestOptions = null;
- }
}
}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index 8614bab..367c235 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -40,8 +40,7 @@
public RDBSstFileWriter() {
EnvOptions envOptions = new EnvOptions();
- Options options = new Options();
- this.sstFileWriter = new SstFileWriter(envOptions, options);
+ this.sstFileWriter = new SstFileWriter(envOptions, new Options());
this.keyCounter = new AtomicLong(0);
}
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index c99274d..01597a7 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -431,3 +431,22 @@
repeated KeyIntValue stat = 2;
repeated KeyContainerIDList statSample = 3;
}
+
+message ContainerBalancerConfigurationProto {
+ optional string utilizationThreshold = 5;
+ optional int32 datanodesInvolvedMaxPercentagePerIteration = 6;
+ optional int64 sizeMovedMaxPerIteration = 7;
+ optional int64 sizeEnteringTargetMax = 8;
+ optional int64 sizeLeavingSourceMax = 9;
+ optional int32 iterations = 10;
+ optional string excludeContainers = 11;
+ optional int64 moveTimeout = 12;
+ optional int64 balancingIterationInterval = 13;
+ optional string includeDatanodes = 14;
+ optional string excludeDatanodes = 15;
+ optional bool moveNetworkTopologyEnable = 16;
+ optional bool triggerDuBeforeMoveEnable = 17;
+
+ required bool shouldRun = 18;
+ optional int32 nextIterationIndex = 19;
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index c2d96ff..77d9b5c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.fs.DUFactory;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -33,7 +34,7 @@
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.StatefulService;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -43,6 +44,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -65,7 +67,7 @@
* Container balancer is a service in SCM to move containers between over- and
* under-utilized datanodes.
*/
-public class ContainerBalancer implements SCMService {
+public class ContainerBalancer extends StatefulService {
public static final Logger LOG =
LoggerFactory.getLogger(ContainerBalancer.class);
@@ -98,7 +100,6 @@
private NetworkTopology networkTopology;
private double upperLimit;
private double lowerLimit;
- private volatile boolean balancerRunning;
private volatile Thread currentBalancingThread;
private Lock lock;
private ContainerBalancerSelectionCriteria selectionCriteria;
@@ -110,15 +111,17 @@
CompletableFuture<LegacyReplicationManager.MoveResult>>
moveSelectionToFutureMap;
private IterationResult iterationResult;
+ private int nextIterationIndex;
/**
* Constructs ContainerBalancer with the specified arguments. Initializes
- * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
- * Container Balancer does not start on construction.
+ * ContainerBalancerMetrics. Container Balancer does not start on
+ * construction.
*
* @param scm the storage container manager
*/
public ContainerBalancer(StorageContainerManager scm) {
+ super(scm.getStatefulServiceStateManager());
this.nodeManager = scm.getScmNodeManager();
this.containerManager = scm.getContainerManager();
this.replicationManager = scm.getReplicationManager();
@@ -134,13 +137,16 @@
this.unBalancedNodes = new ArrayList<>();
this.placementPolicy = scm.getContainerPlacementPolicy();
this.networkTopology = scm.getClusterMap();
+ this.nextIterationIndex = 0;
this.lock = new ReentrantLock();
findSourceStrategy = new FindSourceGreedy(nodeManager);
+ scm.getSCMServiceManager().register(this);
}
/**
- * Balances the cluster.
+ * Balances the cluster in iterations. Regularly checks if balancing has
+ * been stopped.
*/
private void balance() {
this.iterations = config.getIterations();
@@ -149,7 +155,11 @@
this.iterations = Integer.MAX_VALUE;
}
- for (int i = 0; i < iterations && balancerRunning; i++) {
+ // nextIterationIndex is the iteration that balancer should start from on
+ // leader change or restart
+ int i = nextIterationIndex;
+ resetState();
+ for (; i < iterations && isBalancerRunning(); i++) {
if (config.getTriggerDuEnable()) {
// before starting a new iteration, we trigger all the datanode
// to run `du`. this is an aggressive action, with which we can
@@ -179,19 +189,37 @@
}
}
- // stop balancing if iteration is not initialized
+ // initialize this iteration. stop balancing on initialization failure
if (!initializeIteration()) {
- stopBalancer();
+ // just return if the reason for initialization failure is that
+ // balancer has been stopped in another thread
+ if (!isBalancerRunning()) {
+ return;
+ }
+ // otherwise, try to stop balancer
+ tryStopBalancer("Could not initialize ContainerBalancer's " +
+ "iteration number " + i);
return;
}
- //if no new move option is generated, it means the cluster can
- //not be balanced any more , so just stop
IterationResult iR = doIteration();
metrics.incrementNumIterations(1);
LOG.info("Result of this iteration of Container Balancer: {}", iR);
+
+ // persist next iteration index
+ if (iR == IterationResult.ITERATION_COMPLETED) {
+ try {
+ saveConfiguration(config, true, i + 1);
+ } catch (IOException e) {
+ LOG.warn("Could not persist next iteration index value for " +
+ "ContainerBalancer after completing an iteration", e);
+ }
+ }
+
+ // if no new move option is generated, it means the cluster cannot be
+ // balanced anymore; so just stop balancer
if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) {
- stopBalancer();
+ tryStopBalancer(iR.toString());
return;
}
@@ -215,7 +243,11 @@
}
}
}
- stopBalancer();
+
+ // finally, stop balancer if it hasn't been stopped already
+ if (isBalancerRunning()) {
+ tryStopBalancer("Completed all iterations.");
+ }
}
/**
@@ -238,12 +270,11 @@
List<DatanodeUsageInfo> datanodeUsageInfos =
nodeManager.getMostOrLeastUsedDatanodes(true);
if (datanodeUsageInfos.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Container Balancer could not retrieve nodes from Node " +
- "Manager.");
- }
+ LOG.warn("Received an empty list of datanodes from Node Manager when " +
+ "trying to identify which nodes to balance");
return false;
}
+
this.threshold = config.getThresholdAsRatio();
this.maxDatanodesRatioToInvolvePerIteration =
config.getMaxDatanodesRatioToInvolvePerIteration();
@@ -801,24 +832,72 @@
/**
* Receives a notification for raft or safe mode related status changes.
* Stops ContainerBalancer if it's running and the current SCM becomes a
- * follower or enters safe mode.
+ * follower or enters safe mode. Starts ContainerBalancer if the current
+ * SCM becomes leader, is out of safe mode and balancer should run.
*/
@Override
public void notifyStatusChanged() {
- if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
- if (isBalancerRunning()) {
- stopBalancingThread();
+ lock.lock();
+ try {
+ if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+ if (isBalancerRunning()) {
+ LOG.info("Stopping ContainerBalancer in this scm on status change");
+ stop();
+ }
+ } else {
+ if (shouldRun()) {
+ try {
+ LOG.info("Starting ContainerBalancer in this scm on status change");
+ start();
+ } catch (IllegalContainerBalancerStateException |
+ InvalidContainerBalancerConfigurationException e) {
+ LOG.warn("Could not start ContainerBalancer on raft/safe-mode " +
+ "status change.", e);
+ }
+ }
}
+ } finally {
+ lock.unlock();
}
}
/**
- * Checks if ContainerBalancer should run.
- * @return false
+ * Checks if ContainerBalancer should start (after a leader change, restart
+ * etc.) by reading persisted state.
+ * @return true if the persisted state is true, otherwise false
*/
@Override
public boolean shouldRun() {
- return false;
+ try {
+ ContainerBalancerConfigurationProto proto =
+ readConfiguration(ContainerBalancerConfigurationProto.class);
+ if (proto == null) {
+ LOG.warn("Could not find persisted configuration for {} when checking" +
+ " if ContainerBalancer should run. ContainerBalancer should not " +
+ "run now.", this.getServiceName());
+ return false;
+ }
+ return proto.getShouldRun();
+ } catch (IOException e) {
+ LOG.warn("Could not read persisted configuration for checking if " +
+ "ContainerBalancer should start. ContainerBalancer should not start" +
+ " now.", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if ContainerBalancer is currently running in this SCM.
+ *
+ * @return true if the currentBalancingThread is not null, otherwise false
+ */
+ public boolean isBalancerRunning() {
+ lock.lock();
+ try {
+ return currentBalancingThread != null;
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -830,12 +909,47 @@
}
/**
- * Starts ContainerBalancer as an SCMService.
+ * Starts ContainerBalancer as an SCMService. Validates state, reads and
+ * validates persisted configuration, and then starts the balancing
+ * thread.
+ * @throws IllegalContainerBalancerStateException if balancer should not
+ * run according to persisted configuration
+ * @throws InvalidContainerBalancerConfigurationException if failed to
+ * retrieve persisted configuration, or the configuration is null
*/
@Override
- public void start() {
- if (shouldRun()) {
+ public void start() throws IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException {
+ lock.lock();
+ try {
+ // should be leader-ready, out of safe mode, and not running already
+ validateState(false);
+ ContainerBalancerConfigurationProto proto;
+ try {
+ proto = readConfiguration(ContainerBalancerConfigurationProto.class);
+ } catch (IOException e) {
+ throw new InvalidContainerBalancerConfigurationException("Could not " +
+ "retrieve persisted configuration while starting " +
+ "Container Balancer as an SCMService. Will not start now.", e);
+ }
+ if (proto == null) {
+ throw new InvalidContainerBalancerConfigurationException("Persisted " +
+ "configuration for ContainerBalancer is null during start. Will " +
+ "not start now.");
+ }
+ if (!proto.getShouldRun()) {
+ throw new IllegalContainerBalancerStateException("According to " +
+ "persisted configuration, ContainerBalancer should not run.");
+ }
+ ContainerBalancerConfiguration configuration =
+ ContainerBalancerConfiguration.fromProtobuf(proto,
+ ozoneConfiguration);
+ validateConfiguration(configuration);
+ this.config = configuration;
+ this.nextIterationIndex = proto.getNextIterationIndex();
startBalancingThread();
+ } finally {
+ lock.unlock();
}
}
@@ -848,13 +962,16 @@
* @throws InvalidContainerBalancerConfigurationException if
* {@link ContainerBalancerConfiguration} config file is incorrectly
* configured
+ * @throws IOException on failure to persist
+ * {@link ContainerBalancerConfiguration}
*/
- public void startBalancer() throws IllegalContainerBalancerStateException,
- InvalidContainerBalancerConfigurationException {
+ public void startBalancer(ContainerBalancerConfiguration configuration)
+ throws IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException, IOException {
lock.lock();
try {
- validateState();
- validateConfiguration(this.config);
+ // validates state, config, and then saves config
+ setBalancerConfigOnStartBalancer(configuration);
startBalancingThread();
} finally {
lock.unlock();
@@ -867,7 +984,6 @@
private void startBalancingThread() {
lock.lock();
try {
- balancerRunning = true;
currentBalancingThread = new Thread(this::balance);
currentBalancingThread.setName("ContainerBalancer");
currentBalancingThread.setDaemon(true);
@@ -879,11 +995,17 @@
}
/**
- * Checks if ContainerBalancer can start.
- * @throws IllegalContainerBalancerStateException if ContainerBalancer is
- * already running, SCM is in safe mode, or SCM is not leader ready
+ * Validates balancer's state based on the specified expectedRunning.
+ * Confirms SCM is leader-ready and out of safe mode.
+ *
+ * @param expectedRunning true if ContainerBalancer is expected to be
+ * running, else false
+ * @throws IllegalContainerBalancerStateException if SCM is not
+ * leader-ready, is in safe mode, or state does not match the specified
+ * expected state
*/
- private void validateState() throws IllegalContainerBalancerStateException {
+ private void validateState(boolean expectedRunning)
+ throws IllegalContainerBalancerStateException {
if (!scmContext.isLeaderReady()) {
LOG.warn("SCM is not leader ready");
throw new IllegalContainerBalancerStateException("SCM is not leader " +
@@ -895,10 +1017,10 @@
}
lock.lock();
try {
- if (isBalancerRunning() || currentBalancingThread != null) {
- LOG.warn("Cannot start ContainerBalancer because it's already running");
+ if (isBalancerRunning() != expectedRunning) {
throw new IllegalContainerBalancerStateException(
- "Cannot start ContainerBalancer because it's already running");
+ "Expect ContainerBalancer running state to be " + expectedRunning +
+ ", but running state is actually " + isBalancerRunning());
}
} finally {
lock.unlock();
@@ -910,20 +1032,49 @@
*/
@Override
public void stop() {
- stopBalancer();
+ lock.lock();
+ try {
+ if (!isBalancerRunning()) {
+ LOG.warn("Cannot stop Container Balancer because it's not running");
+ return;
+ }
+ stopBalancingThread();
+ } finally {
+ lock.unlock();
+ }
}
/**
* Stops ContainerBalancer gracefully.
*/
- public void stopBalancer() {
+ public void stopBalancer()
+ throws IOException, IllegalContainerBalancerStateException {
lock.lock();
try {
- if (!isBalancerRunning()) {
- LOG.info("Container Balancer is not running.");
- return;
- }
- stopBalancingThread();
+ // should be leader, out of safe mode, and currently running
+ validateState(true);
+ saveConfiguration(config, false, 0);
+ stop();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Tries to stop ContainerBalancer. Logs the reason for stopping. Calls
+ * {@link ContainerBalancer#stopBalancer()}.
+ * @param stopReason a string specifying the reason for stopping
+ * ContainerBalancer.
+ */
+ private void tryStopBalancer(String stopReason) {
+ lock.lock();
+ try {
+ LOG.info("Stopping ContainerBalancer. Reason for stopping: {}",
+ stopReason);
+ stopBalancer();
+ } catch (IllegalContainerBalancerStateException | IOException e) {
+ LOG.warn("Tried to stop ContainerBalancer but failed. Reason for " +
+ "stopping: {}", stopReason, e);
} finally {
lock.unlock();
}
@@ -933,7 +1084,6 @@
Thread balancingThread;
lock.lock();
try {
- balancerRunning = false;
balancingThread = currentBalancingThread;
currentBalancingThread = null;
} finally {
@@ -952,6 +1102,20 @@
LOG.info("Container Balancer stopped successfully.");
}
+ private void saveConfiguration(ContainerBalancerConfiguration configuration,
+ boolean shouldRun, int index)
+ throws IOException {
+ lock.lock();
+ try {
+ saveConfiguration(configuration.toProtobufBuilder()
+ .setShouldRun(shouldRun)
+ .setNextIterationIndex(index)
+ .build());
+ } finally {
+ lock.unlock();
+ }
+ }
+
private void validateConfiguration(ContainerBalancerConfiguration conf)
throws InvalidContainerBalancerConfigurationException {
// maxSizeEnteringTarget and maxSizeLeavingSource should by default be
@@ -1007,12 +1171,24 @@
}
/**
- * Sets the configuration that ContainerBalancer will use. This should be
- * set before starting balancer.
- * @param config ContainerBalancerConfiguration
+ * Persists the configuration that ContainerBalancer will use after
+ * validating state and the specified configuration.
+ * @param configuration ContainerBalancerConfiguration to persist
+ * @throws InvalidContainerBalancerConfigurationException on failure to
+ * validate the specified configuration
+ * @throws IllegalContainerBalancerStateException if this SCM is not leader
+ * or not out of safe mode or if ContainerBalancer is currently running in
+ * this SCM
+ * @throws IOException on failure to persist configuration
*/
- public void setConfig(ContainerBalancerConfiguration config) {
- this.config = config;
+ private void setBalancerConfigOnStartBalancer(
+ ContainerBalancerConfiguration configuration)
+ throws InvalidContainerBalancerConfigurationException,
+ IllegalContainerBalancerStateException, IOException {
+ validateState(false);
+ validateConfiguration(configuration);
+ saveConfiguration(configuration, true, 0);
+ this.config = configuration;
}
/**
@@ -1060,15 +1236,6 @@
return sourceToTargetMap;
}
- /**
- * Checks if ContainerBalancer is currently running.
- *
- * @return true if ContainerBalancer is running, false if not running.
- */
- public boolean isBalancerRunning() {
- return balancerRunning;
- }
-
@VisibleForTesting
int getCountDatanodesInvolvedPerIteration() {
return countDatanodesInvolvedPerIteration;
@@ -1096,7 +1263,7 @@
public String toString() {
String status = String.format("%nContainer Balancer status:%n" +
"%-30s %s%n" +
- "%-30s %b%n", "Key", "Value", "Running", balancerRunning);
+ "%-30s %b%n", "Key", "Value", "Running", isBalancerRunning());
return status + config.toString();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 4e994c8..9d50837 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -22,8 +22,11 @@
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -209,6 +212,10 @@
return triggerDuEnable;
}
+ public void setTriggerDuEnable(boolean enable) {
+ triggerDuEnable = enable;
+ }
+
/**
* Set the NetworkTopologyEnable value for Container Balancer.
*
@@ -315,6 +322,10 @@
this.moveTimeout = duration.toMillis();
}
+ public void setMoveTimeout(long millis) {
+ this.moveTimeout = millis;
+ }
+
public Duration getBalancingInterval() {
return Duration.ofMillis(balancingInterval);
}
@@ -323,6 +334,10 @@
this.balancingInterval = balancingInterval.toMillis();
}
+ public void setBalancingInterval(long millis) {
+ this.balancingInterval = millis;
+ }
+
/**
* Gets a set of datanode hostnames or ip addresses that will be the exclusive
* participants in balancing.
@@ -390,4 +405,73 @@
"Max Size Leaving Source per Iteration",
maxSizeLeavingSource / OzoneConsts.GB);
}
+
+ ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
+ ContainerBalancerConfigurationProto.Builder builder =
+ ContainerBalancerConfigurationProto.newBuilder();
+
+ builder.setUtilizationThreshold(threshold)
+ .setDatanodesInvolvedMaxPercentagePerIteration(
+ maxDatanodesPercentageToInvolvePerIteration)
+ .setSizeMovedMaxPerIteration(maxSizeToMovePerIteration)
+ .setSizeEnteringTargetMax(maxSizeEnteringTarget)
+ .setSizeLeavingSourceMax(maxSizeLeavingSource)
+ .setIterations(iterations)
+ .setExcludeContainers(excludeContainers)
+ .setMoveTimeout(moveTimeout)
+ .setBalancingIterationInterval(balancingInterval)
+ .setIncludeDatanodes(includeNodes)
+ .setExcludeDatanodes(excludeNodes)
+ .setMoveNetworkTopologyEnable(networkTopologyEnable)
+ .setTriggerDuBeforeMoveEnable(triggerDuEnable);
+ return builder;
+ }
+
+ static ContainerBalancerConfiguration fromProtobuf(
+ @NotNull ContainerBalancerConfigurationProto proto,
+ @NotNull OzoneConfiguration ozoneConfiguration) {
+ ContainerBalancerConfiguration config =
+ ozoneConfiguration.getObject(ContainerBalancerConfiguration.class);
+ if (proto.hasUtilizationThreshold()) {
+ config.setThreshold(Double.parseDouble(proto.getUtilizationThreshold()));
+ }
+ if (proto.hasDatanodesInvolvedMaxPercentagePerIteration()) {
+ config.setMaxDatanodesPercentageToInvolvePerIteration(
+ proto.getDatanodesInvolvedMaxPercentagePerIteration());
+ }
+ if (proto.hasSizeMovedMaxPerIteration()) {
+ config.setMaxSizeToMovePerIteration(proto.getSizeMovedMaxPerIteration());
+ }
+ if (proto.hasSizeEnteringTargetMax()) {
+ config.setMaxSizeEnteringTarget(proto.getSizeEnteringTargetMax());
+ }
+ if (proto.hasSizeLeavingSourceMax()) {
+ config.setMaxSizeLeavingSource(proto.getSizeLeavingSourceMax());
+ }
+ if (proto.hasIterations()) {
+ config.setIterations(proto.getIterations());
+ }
+ if (proto.hasExcludeContainers()) {
+ config.setExcludeContainers(proto.getExcludeContainers());
+ }
+ if (proto.hasMoveTimeout()) {
+ config.setMoveTimeout(proto.getMoveTimeout());
+ }
+ if (proto.hasBalancingIterationInterval()) {
+ config.setBalancingInterval(proto.getBalancingIterationInterval());
+ }
+ if (proto.hasIncludeDatanodes()) {
+ config.setIncludeNodes(proto.getIncludeDatanodes());
+ }
+ if (proto.hasExcludeDatanodes()) {
+ config.setExcludeNodes(proto.getExcludeDatanodes());
+ }
+ if (proto.hasMoveNetworkTopologyEnable()) {
+ config.setNetworkTopologyEnable(proto.getMoveNetworkTopologyEnable());
+ }
+ if (proto.hasTriggerDuBeforeMoveEnable()) {
+ config.setTriggerDuEnable(proto.getTriggerDuBeforeMoveEnable());
+ }
+ return config;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
index cc938e6..b061ddf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
@@ -18,10 +18,13 @@
package org.apache.hadoop.hdds.scm.container.balancer;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
/**
* Signals that a state change cannot be performed on ContainerBalancer.
*/
-public class IllegalContainerBalancerStateException extends Exception {
+public class IllegalContainerBalancerStateException extends
+ SCMServiceException {
/**
* Constructs an IllegalContainerBalancerStateException with no detail
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
index c6a6bf0..9a4cc86 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
@@ -18,11 +18,16 @@
package org.apache.hadoop.hdds.scm.container.balancer;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
+import java.io.IOException;
+
/**
* Signals that {@link ContainerBalancerConfiguration} contains invalid
* configuration value(s).
*/
-public class InvalidContainerBalancerConfigurationException extends Exception {
+public class InvalidContainerBalancerConfigurationException extends
+ SCMServiceException {
/**
* Constructs an InvalidContainerBalancerConfigurationException with no detail
@@ -44,4 +49,8 @@
super(s);
}
+ public InvalidContainerBalancerConfigurationException(String s,
+ IOException e) {
+ super(s, e);
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
index 4d7c435..2b185c9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
@@ -66,7 +66,7 @@
/**
* starts the SCM service.
*/
- void start();
+ void start() throws SCMServiceException;
/**
* stops the SCM service.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
new file mode 100644
index 0000000..72fb7d2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * Checked exceptions thrown by an {@link SCMService}.
+ */
+public class SCMServiceException extends Exception {
+
+ /**
+ * Constructs a new exception with {@code null} as its detail message.
+ */
+ public SCMServiceException() {
+ super();
+ }
+
+ public SCMServiceException(String s) {
+ super(s);
+ }
+
+ public SCMServiceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SCMServiceException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
index 1b75c4f..2ab8f8e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
@@ -70,7 +70,11 @@
public synchronized void start() {
for (SCMService service : services) {
LOG.debug("Stopping service:{}.", service.getServiceName());
- service.start();
+ try {
+ service.start();
+ } catch (SCMServiceException e) {
+ LOG.warn("Could not start " + service.getServiceName(), e);
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
index 441e83c..69df7c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
@@ -20,8 +20,6 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -35,10 +33,11 @@
/**
* Initialize a StatefulService from an extending class.
- * @param scm {@link StorageContainerManager}
+ * @param stateManager a reference to the
+ * {@link StatefulServiceStateManager} from SCM.
*/
- protected StatefulService(StorageContainerManager scm) {
- stateManager = scm.getStatefulServiceStateManager();
+ protected StatefulService(StatefulServiceStateManager stateManager) {
+ this.stateManager = stateManager;
}
/**
@@ -60,21 +59,27 @@
*
* @param configType the Class object of the protobuf message type
* @param <T> the Type of the protobuf message
- * @return persisted protobuf message
+ * @return persisted protobuf message or null if the entry is not found
* @throws IOException on failure to fetch the message from DB or when
* parsing it. ensure the specified configType is correct
*/
protected final <T extends GeneratedMessage> T readConfiguration(
Class<T> configType) throws IOException {
+ ByteString byteString = stateManager.readConfiguration(getServiceName());
+ if (byteString == null) {
+ return null;
+ }
try {
return configType.cast(ReflectionUtil.getMethod(configType,
"parseFrom", ByteString.class)
- .invoke(null, stateManager.readConfiguration(getServiceName())));
+ .invoke(null, byteString));
} catch (NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
e.printStackTrace();
- throw new InvalidProtocolBufferException("GeneratedMessage cannot " +
- "be parsed for type " + configType + ": " + e.getMessage());
+ throw new IOException("GeneratedMessage cannot be parsed. Ensure that "
+ + configType + " is the correct expected message type for " +
+ this.getServiceName(), e);
}
+
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
index d470f1b..1e7a756 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
@@ -23,6 +23,8 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Proxy;
@@ -34,6 +36,9 @@
public final class StatefulServiceStateManagerImpl
implements StatefulServiceStateManager {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(StatefulServiceStateManagerImpl.class);
+
// this table maps the service name to the configuration (ByteString)
private Table<String, ByteString> statefulServiceConfig;
private final DBTransactionBuffer transactionBuffer;
@@ -52,10 +57,19 @@
public void saveConfiguration(String serviceName, ByteString bytes)
throws IOException {
transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added specified bytes to the transaction buffer for key " +
+ "{} to table {}", serviceName, statefulServiceConfig.getName());
+ }
+
if (transactionBuffer instanceof SCMHADBTransactionBuffer) {
SCMHADBTransactionBuffer buffer =
(SCMHADBTransactionBuffer) transactionBuffer;
buffer.flush();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transaction buffer flushed");
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
new file mode 100644
index 0000000..e0cf00c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha.io;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A dummy codec that serializes a ByteString object to ByteString.
+ */
+public class ByteStringCodec implements Codec {
+
+ @Override
+ public ByteString serialize(Object object)
+ throws InvalidProtocolBufferException {
+ return (ByteString) object;
+ }
+
+ @Override
+ public Object deserialize(Class<?> type, ByteString value)
+ throws InvalidProtocolBufferException {
+ return value;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
index 9fb771b..dae2b3c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.ha.io;
+import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolMessageEnum;
@@ -45,6 +46,7 @@
codecs.put(Boolean.class, new BooleanCodec());
codecs.put(BigInteger.class, new BigIntegerCodec());
codecs.put(X509Certificate.class, new X509CertificateCodec());
+ codecs.put(ByteString.class, new ByteStringCodec());
}
private CodecFactory() { }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 2d38331..fcee862 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -909,10 +909,9 @@
}
ContainerBalancer containerBalancer = scm.getContainerBalancer();
- containerBalancer.setConfig(cbc);
try {
- containerBalancer.startBalancer();
- } catch (IllegalContainerBalancerStateException |
+ containerBalancer.startBalancer(cbc);
+ } catch (IllegalContainerBalancerStateException | IOException |
InvalidContainerBalancerConfigurationException e) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
SCMAction.START_CONTAINER_BALANCER, null, e));
@@ -931,9 +930,14 @@
@Override
public void stopContainerBalancer() throws IOException {
getScm().checkAdminAccess(getRemoteUser());
- AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
- SCMAction.STOP_CONTAINER_BALANCER, null));
- scm.getContainerBalancer().stopBalancer();
+ try {
+ scm.getContainerBalancer().stopBalancer();
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.STOP_CONTAINER_BALANCER, null));
+ } catch (IllegalContainerBalancerStateException e) {
+ AUDIT.logWriteFailure(buildAuditMessageForFailure(
+ SCMAction.STOP_CONTAINER_BALANCER, null, e));
+ }
}
@Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9e651c8..4440d4a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1469,10 +1469,15 @@
@Override
public void stop() {
try {
- LOG.info("Stopping Container Balancer service.");
- containerBalancer.stopBalancer();
+ if (containerBalancer.isBalancerRunning()) {
+ LOG.info("Stopping Container Balancer service.");
+ // stop ContainerBalancer thread in this scm
+ containerBalancer.stop();
+ } else {
+ LOG.info("Container Balancer is not running.");
+ }
} catch (Exception e) {
- LOG.error("Failed to stop Container Balancer service.");
+ LOG.error("Failed to stop Container Balancer service.", e);
}
try {
diff --git a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
index f981495..deaf37a 100644
--- a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
+++ b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
@@ -27,7 +27,7 @@
</tr>
</tbody>
</table>
-callback.sh
+
<h2>Node counts</h2>
<table class="table table-bordered table-striped" class="col-md-6">
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 4901617..98c92b2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.balancer;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -28,7 +29,6 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -38,8 +38,11 @@
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -49,14 +52,13 @@
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -96,21 +98,24 @@
private Map<ContainerID, ContainerInfo> cidToInfoMap = new HashMap<>();
private Map<DatanodeUsageInfo, Set<ContainerID>> datanodeToContainersMap =
new HashMap<>();
+ private Map<String, ByteString> serviceToConfigMap = new HashMap<>();
private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
+ private StatefulServiceStateManager serviceStateManager;
/**
* Sets up configuration values and creates a mock cluster.
*/
@Before
- public void setup() throws SCMException, NodeNotFoundException {
+ public void setup() throws IOException, NodeNotFoundException {
conf = new OzoneConfiguration();
scm = Mockito.mock(StorageContainerManager.class);
containerManager = Mockito.mock(ContainerManager.class);
replicationManager = Mockito.mock(ReplicationManager.class);
+ serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
+ SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class);
+ // these configs will usually be specified in each test
balancerConfiguration =
conf.getObject(ContainerBalancerConfiguration.class);
balancerConfiguration.setThreshold(10);
@@ -161,7 +166,30 @@
when(scm.getClusterMap()).thenReturn(null);
when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class));
when(scm.getConfiguration()).thenReturn(conf);
+ when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager);
+ when(scm.getSCMServiceManager()).thenReturn(scmServiceManager);
+ /*
+ When StatefulServiceStateManager#saveConfiguration is called, save to
+ in-memory serviceToConfigMap instead.
+ */
+ Mockito.doAnswer(i -> {
+ serviceToConfigMap.put(i.getArgument(0, String.class), i.getArgument(1,
+ ByteString.class));
+ return null;
+ }).when(serviceStateManager).saveConfiguration(
+ Mockito.any(String.class),
+ Mockito.any(ByteString.class));
+
+ /*
+ When StatefulServiceStateManager#readConfiguration is called, read from
+ serviceToConfigMap instead.
+ */
+ when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer(
+ i -> serviceToConfigMap.get(i.getArgument(0, String.class)));
+
+ Mockito.doNothing().when(scmServiceManager)
+ .register(Mockito.any(SCMService.class));
containerBalancer = new ContainerBalancer(scm);
}
@@ -184,7 +212,9 @@
*/
@Test
public void
- initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() {
+ initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
List<DatanodeUsageInfo> expectedUnBalancedNodes;
List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
@@ -207,7 +237,7 @@
unBalancedNodesAccordingToBalancer =
containerBalancer.getUnBalancedNodes();
- containerBalancer.stopBalancer();
+ stopBalancer();
Assert.assertEquals(
expectedUnBalancedNodes.size(),
unBalancedNodesAccordingToBalancer.size());
@@ -224,13 +254,15 @@
* balanced.
*/
@Test
- public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() {
+ public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(99.99);
startBalancer(balancerConfiguration);
sleepWhileBalancing(100);
- containerBalancer.stopBalancer();
+ stopBalancer();
ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
Assert.assertEquals(0, metrics.getNumDatanodesUnbalanced());
@@ -241,7 +273,9 @@
* maxDatanodesRatioToInvolvePerIteration limit.
*/
@Test
- public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() {
+ public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
int percent = 20;
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
percent);
@@ -259,11 +293,13 @@
Assert.assertTrue(metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
Assert.assertFalse(
metrics.getNumDatanodesInvolvedInLatestIteration() > number);
- containerBalancer.stopBalancer();
+ stopBalancer();
}
@Test
- public void containerBalancerShouldSelectOnlyClosedContainers() {
+ public void containerBalancerShouldSelectOnlyClosedContainers()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
// make all containers open, balancer should not select any of them
for (ContainerInfo containerInfo : cidToInfoMap.values()) {
containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
@@ -271,7 +307,7 @@
balancerConfiguration.setThreshold(10);
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// balancer should have identified unbalanced nodes
Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
@@ -291,7 +327,7 @@
}
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// check whether all selected containers are closed
for (ContainerMoveSelection moveSelection:
@@ -303,7 +339,9 @@
}
@Test
- public void containerBalancerShouldObeyMaxSizeToMoveLimit() {
+ public void containerBalancerShouldObeyMaxSizeToMoveLimit()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(1);
balancerConfiguration.setMaxSizeToMovePerIteration(10 * OzoneConsts.GB);
balancerConfiguration.setIterations(1);
@@ -319,11 +357,13 @@
.getDataSizeMovedGBInLatestIteration();
Assert.assertTrue(size > 0);
Assert.assertFalse(size > 10);
- containerBalancer.stopBalancer();
+ stopBalancer();
}
@Test
- public void targetDatanodeShouldNotAlreadyContainSelectedContainer() {
+ public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -336,7 +376,7 @@
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
containerBalancer.getSourceToTargetMap();
for (ContainerMoveSelection moveSelection : sourceToTargetMap.values()) {
@@ -350,7 +390,9 @@
}
@Test
- public void containerMoveSelectionShouldFollowPlacementPolicy() {
+ public void containerMoveSelectionShouldFollowPlacementPolicy()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -363,7 +405,7 @@
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
containerBalancer.getSourceToTargetMap();
@@ -392,7 +434,8 @@
@Test
public void targetDatanodeShouldBeInServiceHealthy()
- throws NodeNotFoundException {
+ throws NodeNotFoundException, IllegalContainerBalancerStateException,
+ IOException, InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -407,7 +450,7 @@
} catch (InterruptedException e) {
}
- containerBalancer.stopBalancer();
+ stopBalancer();
for (ContainerMoveSelection moveSelection :
containerBalancer.getSourceToTargetMap().values()) {
DatanodeDetails target = moveSelection.getTargetNode();
@@ -419,7 +462,9 @@
}
@Test
- public void selectedContainerShouldNotAlreadyHaveBeenSelected() {
+ public void selectedContainerShouldNotAlreadyHaveBeenSelected()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -434,7 +479,7 @@
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Set<ContainerID> containers = new HashSet<>();
for (ContainerMoveSelection moveSelection :
containerBalancer.getSourceToTargetMap().values()) {
@@ -445,7 +490,9 @@
}
@Test
- public void balancerShouldNotSelectConfiguredExcludeContainers() {
+ public void balancerShouldNotSelectConfiguredExcludeContainers()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -461,7 +508,7 @@
Thread.sleep(1000);
} catch (InterruptedException e) { }
- containerBalancer.stopBalancer();
+ stopBalancer();
Set<ContainerID> excludeContainers =
balancerConfiguration.getExcludeContainers();
for (ContainerMoveSelection moveSelection :
@@ -472,7 +519,9 @@
}
@Test
- public void balancerShouldObeyMaxSizeEnteringTargetLimit() {
+ public void balancerShouldObeyMaxSizeEnteringTargetLimit()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
conf.set("ozone.scm.container.size", "1MB");
balancerConfiguration =
conf.getObject(ContainerBalancerConfiguration.class);
@@ -487,7 +536,7 @@
Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
Assert.assertTrue(containerBalancer.getSourceToTargetMap().isEmpty());
- containerBalancer.stopBalancer();
+ stopBalancer();
// some containers should be selected when using default values
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
@@ -497,26 +546,28 @@
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// balancer should have identified unbalanced nodes
Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
Assert.assertFalse(containerBalancer.getSourceToTargetMap().isEmpty());
}
@Test
- public void testMetrics() {
+ public void testMetrics()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
conf.set("hdds.datanode.du.refresh.period", "1ms");
balancerConfiguration.setBalancingInterval(Duration.ofMillis(2));
balancerConfiguration.setThreshold(10);
balancerConfiguration.setIterations(1);
balancerConfiguration.setMaxSizeEnteringTarget(6 * OzoneConsts.GB);
- // deliberately set max size per iteration to a low value, 6GB
+ // deliberately set max size per iteration to a low value, 6 GB
balancerConfiguration.setMaxSizeToMovePerIteration(6 * OzoneConsts.GB);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
Assert.assertEquals(determineExpectedUnBalancedNodes(
@@ -535,7 +586,9 @@
* exclude configurations, then it should be excluded.
*/
@Test
- public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() {
+ public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations()
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setIterations(1);
balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -569,7 +622,7 @@
balancerConfiguration.setIncludeNodes(includeNodes);
startBalancer(balancerConfiguration);
sleepWhileBalancing(500);
- containerBalancer.stopBalancer();
+ stopBalancer();
// finally, these should be the only nodes included in balancing
// (included - excluded)
@@ -606,7 +659,9 @@
@Test
public void checkIterationResult()
- throws NodeNotFoundException, ContainerNotFoundException {
+ throws NodeNotFoundException, IOException,
+ IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException {
balancerConfiguration.setThreshold(10);
balancerConfiguration.setIterations(1);
balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -622,7 +677,7 @@
*/
Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
containerBalancer.getIterationResult());
- containerBalancer.stop();
+ stopBalancer();
/*
Now, limit maxSizeToMovePerIteration but fail all container moves. The
@@ -640,12 +695,14 @@
Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
containerBalancer.getIterationResult());
- containerBalancer.stop();
+ stopBalancer();
}
@Test
public void checkIterationResultTimeout()
- throws NodeNotFoundException, ContainerNotFoundException {
+ throws NodeNotFoundException, IOException,
+ IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException {
Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
Mockito.any(DatanodeDetails.class),
@@ -673,7 +730,7 @@
.getNumContainerMovesCompletedInLatestIteration());
Assert.assertTrue(containerBalancer.getMetrics()
.getNumContainerMovesTimeoutInLatestIteration() > 1);
- containerBalancer.stop();
+ stopBalancer();
}
@@ -859,13 +916,19 @@
}
}
- private void startBalancer(ContainerBalancerConfiguration config) {
- containerBalancer.setConfig(config);
+ private void startBalancer(ContainerBalancerConfiguration config)
+ throws IllegalContainerBalancerStateException, IOException,
+ InvalidContainerBalancerConfigurationException {
+ containerBalancer.startBalancer(config);
+ }
+
+ private void stopBalancer() {
try {
- containerBalancer.startBalancer();
- } catch (IllegalContainerBalancerStateException |
- InvalidContainerBalancerConfigurationException e) {
- LOG.info("Could not start ContainerBalancer while testing", e);
+ if (containerBalancer.isBalancerRunning()) {
+ containerBalancer.stopBalancer();
+ }
+ } catch (IOException | IllegalContainerBalancerStateException e) {
+ LOG.warn("Failed to stop balancer", e);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index df279d8..04daa51 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -249,7 +249,8 @@
// If the failure is NOT caused by other reasons (e.g. container full),
// we assume it is caused by DN failure and exclude the failed DN.
failedStreams.stream()
- .filter(s -> !checkIfContainerToExclude(s.getIoException()))
+ .filter(s -> !checkIfContainerToExclude(
+ HddsClientUtils.checkForException(s.getIoException())))
.forEach(s -> blockOutputStreamEntryPool.getExcludeList()
.addDatanode(s.getDatanodeDetails()));
}
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index 1f29425..b494fdf 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -40,10 +40,10 @@
private final Map<String, ByteString> data = new HashMap<>();
- private boolean failed = false;
+ private IOException exception = null;
- public void setStorageFailed() {
- this.failed = true;
+ public void setStorageFailed(IOException reason) {
+ this.exception = reason;
}
public void putBlock(DatanodeBlockID blockID, BlockData blockData) {
@@ -57,8 +57,8 @@
public void writeChunk(
DatanodeBlockID blockID,
ChunkInfo chunkInfo, ByteString bytes) throws IOException {
- if (failed) {
- throw new IOException("This storage was marked as failed.");
+ if (exception != null) {
+ throw exception;
}
data.put(createKey(blockID, chunkInfo),
ByteString.copyFrom(bytes.toByteArray()));
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index fbcc153..8c5fa76 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -23,11 +23,14 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Factory to create the mock datanode clients.
@@ -37,30 +40,41 @@
private final Map<DatanodeDetails, MockDatanodeStorage> storage =
new HashMap<>();
- private List<DatanodeDetails> pendingToFailNodes = new ArrayList<>();
+ private final Map<IOException, Set<DatanodeDetails>> pendingDNFailures =
+ new HashMap<>();
public void setFailedStorages(List<DatanodeDetails> failedStorages) {
- List<DatanodeDetails> remainingFailNodes = new ArrayList<>();
- for (int i = 0; i < failedStorages.size(); i++) {
- DatanodeDetails failedDN = failedStorages.get(i);
- boolean isCurrentNodeMarked = false;
- final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator =
- storage.entrySet().iterator();
- while (iterator.hasNext()) {
- final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
- iterator.next();
- if (next.getKey().equals(failedDN)) {
- final MockDatanodeStorage value = next.getValue();
- value.setStorageFailed();
- isCurrentNodeMarked = true;
- }
- }
- if (!isCurrentNodeMarked) {
- //This node does not initialized by client yet.
- remainingFailNodes.add(failedDN);
+ mockStorageFailure(failedStorages,
+ new IOException("This storage was marked as failed."));
+ }
+
+ public void mockStorageFailure(Collection<DatanodeDetails> datanodes,
+ IOException reason) {
+ if (!pendingDNFailures.containsKey(reason)) {
+ pendingDNFailures.put(reason, new HashSet<>());
+ }
+ pendingDNFailures.get(reason).addAll(datanodes);
+ mockStorageFailure(reason);
+ }
+
+ private void mockStorageFailure(IOException reason) {
+
+ Collection<DatanodeDetails> datanodes =
+ pendingDNFailures.getOrDefault(reason, Collections.emptySet());
+
+ Iterator<DatanodeDetails> iterator = datanodes.iterator();
+ while (iterator.hasNext()) {
+ DatanodeDetails dn = iterator.next();
+ MockDatanodeStorage mockDN = storage.get(dn);
+ if (mockDN != null) {
+ mockDN.setStorageFailed(reason);
+ iterator.remove();
}
}
- this.pendingToFailNodes = remainingFailNodes;
+
+ if (datanodes.isEmpty()) {
+ pendingDNFailures.remove(reason);
+ }
}
@Override
@@ -76,7 +90,9 @@
.computeIfAbsent(pipeline.getFirstNode(),
r -> new MockDatanodeStorage()));
// Incase if this node already set to mark as failed.
- setFailedStorages(this.pendingToFailNodes);
+ for (IOException reason : pendingDNFailures.keySet()) {
+ mockStorageFailure(reason);
+ }
return mockXceiverClientSpi;
}
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 4dfe966..e468f16 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import java.io.IOException;
@@ -86,6 +87,8 @@
r -> {
try {
return r.setWriteChunk(writeChunk(request.getWriteChunk()));
+ } catch (ContainerNotOpenException e) {
+ return r.setResult(Result.CLOSED_CONTAINER_IO);
} catch (IOException e) {
return r.setResult(Result.IO_EXCEPTION);
}
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index e9ba156..024daa8 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -61,6 +62,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -815,6 +817,69 @@
}
@Test
+ public void testExcludeOnDNFailure() throws IOException {
+ testExcludeFailedDN(IntStream.range(0, 5), IntStream.empty());
+ }
+
+ @Test
+ public void testExcludeOnDNClosed() throws IOException {
+ testExcludeFailedDN(IntStream.empty(), IntStream.range(0, 5));
+ }
+
+ @Test
+ public void testExcludeOnDNMixed() throws IOException {
+ testExcludeFailedDN(IntStream.range(0, 3), IntStream.range(3, 5));
+ }
+
+ private void testExcludeFailedDN(IntStream failedDNIndex,
+ IntStream closedDNIndex) throws IOException {
+ close();
+ OzoneConfiguration con = new OzoneConfiguration();
+ MultiNodePipelineBlockAllocator blkAllocator =
+ new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks, 10);
+ createNewClient(con, blkAllocator);
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ ECReplicationConfig repConfig = new ECReplicationConfig(
+ dataBlocks, parityBlocks, ECReplicationConfig.EcCodec.RS, chunkSize);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName,
+ (long) dataBlocks * chunkSize, repConfig, new HashMap<>())) {
+
+ Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+ ECKeyOutputStream ecKeyOut = (ECKeyOutputStream) out.getOutputStream();
+
+ List<HddsProtos.DatanodeDetailsProto> dns = blkAllocator.getClusterDns();
+
+ // Then let's mark datanodes with closed container
+ List<DatanodeDetails> closedDNs = closedDNIndex
+ .mapToObj(i -> DatanodeDetails.getFromProtoBuf(dns.get(i)))
+ .collect(Collectors.toList());
+ ((MockXceiverClientFactory) factoryStub).mockStorageFailure(closedDNs,
+ new ContainerNotOpenException("Mocked"));
+
+ // Then let's mark failed datanodes
+ List<DatanodeDetails> failedDNs = failedDNIndex
+ .mapToObj(i -> DatanodeDetails.getFromProtoBuf(dns.get(i)))
+ .collect(Collectors.toList());
+ ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+ for (int i = 0; i < dataBlocks; i++) {
+ out.write(inputChunks[i % dataBlocks]);
+ }
+
+ // Assert excludeList only includes failedDNs
+ Assert.assertArrayEquals(failedDNs.toArray(new DatanodeDetails[0]),
+ ecKeyOut.getExcludeList().getDatanodes()
+ .toArray(new DatanodeDetails[0]));
+ }
+ }
+
+ @Test
public void testLargeWriteOfMultipleStripesWithStripeFailure()
throws IOException {
close();
@@ -1110,4 +1175,4 @@
}
return locationInfoList;
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
index d824352..7846a01 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.ozone;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Assert;
@@ -224,4 +225,29 @@
Assert.assertTrue(deletes == prevDeletes + 1);
}
+ /**
+ * Test the consistency of listStatusFSO with TableCache present.
+ */
+ @Test
+ public void testListStatusFSO() throws Exception {
+ // list keys batch size is 1024. Creating keys greater than the
+ // batch size to test batch listing of the keys.
+ int valueGreaterBatchSize = 1200;
+ Path parent = new Path(getBucketPath(), "testListStatusFSO");
+ for (int i = 0; i < valueGreaterBatchSize; i++) {
+ Path key = new Path(parent, "tempKey" + i);
+ ContractTestUtils.touch(getFs(), key);
+ /*
+ To add keys to the cache. listStatusFSO goes through the cache first.
+ The cache is not continuous and may be greater than the batch size.
+ This may cause inconsistency in the listing of keys.
+ */
+ getFs().rename(key, new Path(parent, "key" + i));
+ }
+
+ FileStatus[] fileStatuses = getFs().listStatus(
+ new Path(getBucketPath() + "/testListStatusFSO"));
+ Assert.assertEquals(valueGreaterBatchSize, fileStatuses.length);
+ }
+
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
index b4882ac..906b2aa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
@@ -16,10 +16,17 @@
*/
package org.apache.hadoop.ozone.scm;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration;
+import org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException;
+import org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException;
import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -36,6 +43,7 @@
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -44,7 +52,9 @@
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -226,6 +236,89 @@
Assert.assertFalse(inflightMove.containsKey(id));
}
+ /**
+ * Starts ContainerBalancer when the cluster is already balanced.
+ * ContainerBalancer will identify that no unbalanced nodes are present and
+ * exit and stop in the first iteration. We test that ContainerBalancer
+ * persists ContainerBalancerConfigurationProto#shouldRun as false in all
+ * the 3 SCMs when it stops.
+ * @throws IOException
+ * @throws IllegalContainerBalancerStateException
+ * @throws InvalidContainerBalancerConfigurationException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void testContainerBalancerPersistsConfigurationInAllSCMs()
+ throws IOException, IllegalContainerBalancerStateException,
+ InvalidContainerBalancerConfigurationException, InterruptedException,
+ TimeoutException {
+ SCMClientConfig scmClientConfig =
+ conf.getObject(SCMClientConfig.class);
+ scmClientConfig.setRetryInterval(100);
+ scmClientConfig.setMaxRetryTimeout(1500);
+ Assertions.assertEquals(15, scmClientConfig.getRetryCount());
+ conf.setFromObject(scmClientConfig);
+ StorageContainerManager leader = getLeader(cluster);
+ Assertions.assertNotNull(leader);
+
+ ScmClient scmClient = new ContainerOperationClient(conf);
+ // assert that container balancer is not running right now
+ Assertions.assertFalse(scmClient.getContainerBalancerStatus());
+ ContainerBalancerConfiguration balancerConf =
+ conf.getObject(ContainerBalancerConfiguration.class);
+ ContainerBalancer containerBalancer = leader.getContainerBalancer();
+
+ /*
+ Start container balancer. Since this cluster is already balanced,
+ container balancer should exit early, stop, and persist configuration to DB.
+ */
+ containerBalancer.startBalancer(balancerConf);
+
+ // assert that balancer has stopped since the cluster is already balanced
+ GenericTestUtils.waitFor(() -> !containerBalancer.isBalancerRunning(),
+ 10, 500);
+ Assertions.assertFalse(containerBalancer.isBalancerRunning());
+
+ ByteString byteString =
+ leader.getScmMetadataStore().getStatefulServiceConfigTable().get(
+ containerBalancer.getServiceName());
+ ContainerBalancerConfigurationProto proto =
+ ContainerBalancerConfigurationProto.parseFrom(byteString);
+ GenericTestUtils.waitFor(() -> !proto.getShouldRun(), 5, 50);
+
+ long leaderTermIndex =
+ leader.getScmHAManager().getRatisServer().getSCMStateMachine()
+ .getLastAppliedTermIndex().getIndex();
+
+ /*
+ Fetch persisted configuration to verify that `shouldRun` is set to false.
+ */
+ for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+ if (!scm.checkLeader()) {
+ // Wait and retry for follower to update transactions to leader
+ // snapshot index.
+ // Timeout error if follower does not load update within 3s
+ GenericTestUtils.waitFor(() -> scm.getScmHAManager().getRatisServer()
+ .getSCMStateMachine().getLastAppliedTermIndex()
+ .getIndex() >= leaderTermIndex, 100, 3000);
+ ContainerBalancer followerBalancer = scm.getContainerBalancer();
+ GenericTestUtils.waitFor(
+ () -> !followerBalancer.isBalancerRunning(), 50, 5000);
+ GenericTestUtils.waitFor(() -> !followerBalancer.shouldRun(), 100,
+ 5000);
+ }
+ scm.getStatefulServiceStateManager().readConfiguration(
+ containerBalancer.getServiceName());
+ byteString =
+ scm.getScmMetadataStore().getStatefulServiceConfigTable().get(
+ containerBalancer.getServiceName());
+ ContainerBalancerConfigurationProto protobuf =
+ ContainerBalancerConfigurationProto.parseFrom(byteString);
+ Assertions.assertFalse(protobuf.getShouldRun());
+ }
+ }
+
static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) {
for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
if (scm.checkLeader()) {