Merge remote-tracking branch 'origin/HDDS-3630'
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index 9fc0784..5bec415 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -52,6 +52,12 @@
  * <p>{@literal ../hdds/<<clusterUuid>>/current/<<containerDir>>/<<containerID
  * >>/<<dataDir>>}
  * <p>
+ * Each hdds volume has its own VERSION file. The hdds volume will have one
+ * clusterUuid directory for each SCM it is a part of (currently only one SCM is
+ * supported).
+ * <p>
+ * During DN startup, if the VERSION file exists, we verify that the
+ * clusterID in the version file matches the clusterID from SCM.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -63,9 +69,12 @@
   public static final String HDDS_VOLUME_DIR = "hdds";
 
   private final VolumeIOStats volumeIOStats;
+  private final VolumeInfoMetrics volumeInfoMetrics;
 
   private final AtomicLong committedBytes; // till Open containers become full
 
+  // Mentions the type of volume
+  private final VolumeType type = VolumeType.DATA_VOLUME;
   // The dedicated DbVolume that the db instance of this HddsVolume resides.
   // This is optional, if null then the db instance resides on this HddsVolume.
   private DbVolume dbVolume;
@@ -98,7 +107,10 @@
     super(b);
 
     if (!b.getFailedVolume()) {
+      this.setState(VolumeState.NOT_INITIALIZED);
       this.volumeIOStats = new VolumeIOStats(b.getVolumeRootStr());
+      this.volumeInfoMetrics =
+          new VolumeInfoMetrics(b.getVolumeRootStr(), this);
       this.committedBytes = new AtomicLong(0);
 
       LOG.info("Creating HddsVolume: {} of storage type : {} capacity : {}",
@@ -108,7 +120,9 @@
     } else {
       // Builder is called with failedVolume set, so create a failed volume
       // HddsVolume Object.
+      this.setState(VolumeState.FAILED);
       volumeIOStats = null;
+      volumeInfoMetrics = null;
       committedBytes = null;
     }
 
@@ -130,16 +144,27 @@
     return super.getStorageDir();
   }
 
+  public VolumeType getType() {
+    return type;
+  }
+
   public VolumeIOStats getVolumeIOStats() {
     return volumeIOStats;
   }
 
+  public VolumeInfoMetrics getVolumeInfoStats() {
+    return volumeInfoMetrics;
+  }
+
   @Override
   public void failVolume() {
     super.failVolume();
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
+    if (volumeInfoMetrics != null) {
+      volumeInfoMetrics.unregister();
+    }
     closeDbStore();
   }
 
@@ -149,11 +174,15 @@
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
+    if (volumeInfoMetrics != null) {
+      volumeInfoMetrics.unregister();
+    }
     closeDbStore();
   }
 
   /**
    * add "delta" bytes to committed space in the volume.
+   *
    * @param delta bytes to add to committed space counter
    * @return bytes of committed space
    */
@@ -163,6 +192,7 @@
 
   /**
    * return the committed space in the volume.
+   *
    * @return bytes of committed space
    */
   public long getCommittedBytes() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
index 360c2c7..c5b399b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MetadataVolume.java
@@ -27,10 +27,16 @@
  */
 public class MetadataVolume extends StorageVolume {
 
+  private final VolumeType type = VolumeType.META_VOLUME;
+
   protected MetadataVolume(Builder b) throws IOException {
     super(b);
   }
 
+  public VolumeType getType() {
+    return type;
+  }
+
   /**
    * Builder class for MetadataVolume.
    */
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfoMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfoMetrics.java
new file mode 100644
index 0000000..21b9d2a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfoMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * This class is used to track Volume Info stats for each HDDS Volume.
+ */
+@Metrics(about = "Ozone Volume Information Metrics",
+    context = OzoneConsts.OZONE)
+public class VolumeInfoMetrics {
+
+  private String metricsSourceName = VolumeInfoMetrics.class.getSimpleName();
+  private String volumeRootStr;
+  private HddsVolume volume;
+
+  /**
+   * @param identifier Typically, path to volume root. e.g. /data/hdds
+   */
+  public VolumeInfoMetrics(String identifier, HddsVolume ref) {
+    this.metricsSourceName += '-' + identifier;
+    this.volumeRootStr = identifier;
+    this.volume = ref;
+    init();
+  }
+
+  public void init() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.register(metricsSourceName, "Volume Info Statistics", this);
+  }
+
+  public void unregister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(metricsSourceName);
+  }
+
+  @Metric("Metric to return the Storage Type")
+  public String getStorageType() {
+    return volume.getStorageType().toString();
+  }
+
+  @Metric("Returns the Directory name for the volume")
+  public String getStorageDirectory() {
+    return volume.getStorageDir().toString();
+  }
+
+  @Metric("Return the DataNode UID for the respective volume")
+  public String getDatanodeUuid() {
+    return volume.getDatanodeUuid();
+  }
+
+  @Metric("Return the Layout Version for the volume")
+  public int getLayoutVersion() {
+    return volume.getLayoutVersion();
+  }
+
+  @Metric("Returns the Volume Type")
+  public String getVolumeType() {
+    return volume.getType().name();
+  }
+
+  public String getMetricsSourceName() {
+    return metricsSourceName;
+  }
+
+  /**
+   * Test conservative avail space.
+   * |----used----|   (avail)   |++++++++reserved++++++++|
+   * |<------- capacity ------->|
+   * |<------------------- Total capacity -------------->|
+   * A) avail = capacity - used
+   * B) capacity = used + avail
+   * C) Total capacity = used + avail + reserved
+   */
+
+  /**
+   * Return the Storage type for the Volume.
+   */
+  @Metric("Returns the Used space")
+  public long getUsed() {
+    return volume.getVolumeInfo().getScmUsed();
+  }
+
+  /**
+   * Return the Total Available capacity of the Volume.
+   */
+  @Metric("Returns the Available space")
+  public long getAvailable() {
+    return volume.getVolumeInfo().getAvailable();
+  }
+
+  /**
+   * Return the Total Reserved of the Volume.
+   */
+  @Metric("Fetches the Reserved Space")
+  public long getReserved() {
+    return volume.getVolumeInfo().getReservedInBytes();
+  }
+
+  /**
+   * Return the Total capacity of the Volume.
+   */
+  @Metric("Returns the Capacity of the Volume")
+  public long getCapacity() {
+    return getUsed() + getAvailable();
+  }
+
+  /**
+   * Return the Total capacity of the Volume.
+   */
+  @Metric("Returns the Total Capacity of the Volume")
+  public long getTotalCapacity() {
+    return (getUsed() + getAvailable() + getReserved());
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
index 8c3b62b..e98a29f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
@@ -33,14 +33,10 @@
 
   private final RocksDatabase db;
   private final ColumnFamily family;
-  private IngestExternalFileOptions ingestOptions;
-
 
   public RDBSstFileLoader(RocksDatabase db, ColumnFamily cf) {
     this.db = db;
     this.family = cf;
-    this.ingestOptions = new IngestExternalFileOptions()
-        .setIngestBehind(false);
   }
 
   @Override
@@ -49,16 +45,18 @@
     if (externalFile.length() == 0) {
       return;
     }
-    db.ingestExternalFile(family,
-        Collections.singletonList(externalFile.getAbsolutePath()),
-        ingestOptions);
+    IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()
+        .setIngestBehind(false);
+    try {
+      db.ingestExternalFile(family,
+          Collections.singletonList(externalFile.getAbsolutePath()),
+          ingestOptions);
+    } finally {
+      ingestOptions.close();
+    }
   }
 
   @Override
   public void close() {
-    if (ingestOptions != null) {
-      ingestOptions.close();
-      ingestOptions = null;
-    }
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index 8614bab..367c235 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -40,8 +40,7 @@
 
   public RDBSstFileWriter() {
     EnvOptions envOptions = new EnvOptions();
-    Options options = new Options();
-    this.sstFileWriter = new SstFileWriter(envOptions, options);
+    this.sstFileWriter = new SstFileWriter(envOptions, new Options());
     this.keyCounter = new AtomicLong(0);
   }
 
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index c99274d..01597a7 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -431,3 +431,22 @@
     repeated KeyIntValue stat = 2;
     repeated KeyContainerIDList statSample = 3;
 }
+
+message ContainerBalancerConfigurationProto {
+    optional string utilizationThreshold = 5;
+    optional int32 datanodesInvolvedMaxPercentagePerIteration = 6;
+    optional int64 sizeMovedMaxPerIteration = 7;
+    optional int64 sizeEnteringTargetMax = 8;
+    optional int64 sizeLeavingSourceMax = 9;
+    optional int32 iterations = 10;
+    optional string excludeContainers = 11;
+    optional int64 moveTimeout = 12;
+    optional int64 balancingIterationInterval = 13;
+    optional string includeDatanodes = 14;
+    optional string excludeDatanodes = 15;
+    optional bool moveNetworkTopologyEnable = 16;
+    optional bool triggerDuBeforeMoveEnable = 17;
+
+    required bool shouldRun = 18;
+    optional int32 nextIterationIndex = 19;
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index c2d96ff..77d9b5c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.fs.DUFactory;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -33,7 +34,7 @@
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.StatefulService;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -43,6 +44,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,7 +67,7 @@
  * Container balancer is a service in SCM to move containers between over- and
  * under-utilized datanodes.
  */
-public class ContainerBalancer implements SCMService {
+public class ContainerBalancer extends StatefulService {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(ContainerBalancer.class);
@@ -98,7 +100,6 @@
   private NetworkTopology networkTopology;
   private double upperLimit;
   private double lowerLimit;
-  private volatile boolean balancerRunning;
   private volatile Thread currentBalancingThread;
   private Lock lock;
   private ContainerBalancerSelectionCriteria selectionCriteria;
@@ -110,15 +111,17 @@
       CompletableFuture<LegacyReplicationManager.MoveResult>>
       moveSelectionToFutureMap;
   private IterationResult iterationResult;
+  private int nextIterationIndex;
 
   /**
    * Constructs ContainerBalancer with the specified arguments. Initializes
-   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
-   * Container Balancer does not start on construction.
+   * ContainerBalancerMetrics. Container Balancer does not start on
+   * construction.
    *
    * @param scm the storage container manager
    */
   public ContainerBalancer(StorageContainerManager scm) {
+    super(scm.getStatefulServiceStateManager());
     this.nodeManager = scm.getScmNodeManager();
     this.containerManager = scm.getContainerManager();
     this.replicationManager = scm.getReplicationManager();
@@ -134,13 +137,16 @@
     this.unBalancedNodes = new ArrayList<>();
     this.placementPolicy = scm.getContainerPlacementPolicy();
     this.networkTopology = scm.getClusterMap();
+    this.nextIterationIndex = 0;
 
     this.lock = new ReentrantLock();
     findSourceStrategy = new FindSourceGreedy(nodeManager);
+    scm.getSCMServiceManager().register(this);
   }
 
   /**
-   * Balances the cluster.
+   * Balances the cluster in iterations. Regularly checks if balancing has
+   * been stopped.
    */
   private void balance() {
     this.iterations = config.getIterations();
@@ -149,7 +155,11 @@
       this.iterations = Integer.MAX_VALUE;
     }
 
-    for (int i = 0; i < iterations && balancerRunning; i++) {
+    // nextIterationIndex is the iteration that balancer should start from on
+    // leader change or restart
+    int i = nextIterationIndex;
+    resetState();
+    for (; i < iterations && isBalancerRunning(); i++) {
       if (config.getTriggerDuEnable()) {
         // before starting a new iteration, we trigger all the datanode
         // to run `du`. this is an aggressive action, with which we can
@@ -179,19 +189,37 @@
         }
       }
 
-      // stop balancing if iteration is not initialized
+      // initialize this iteration. stop balancing on initialization failure
       if (!initializeIteration()) {
-        stopBalancer();
+        // just return if the reason for initialization failure is that
+        // balancer has been stopped in another thread
+        if (!isBalancerRunning()) {
+          return;
+        }
+        // otherwise, try to stop balancer
+        tryStopBalancer("Could not initialize ContainerBalancer's " +
+            "iteration number " + i);
         return;
       }
 
-      //if no new move option is generated, it means the cluster can
-      //not be balanced any more , so just stop
       IterationResult iR = doIteration();
       metrics.incrementNumIterations(1);
       LOG.info("Result of this iteration of Container Balancer: {}", iR);
+
+      // persist next iteration index
+      if (iR == IterationResult.ITERATION_COMPLETED) {
+        try {
+          saveConfiguration(config, true, i + 1);
+        } catch (IOException e) {
+          LOG.warn("Could not persist next iteration index value for " +
+              "ContainerBalancer after completing an iteration", e);
+        }
+      }
+
+      // if no new move option is generated, it means the cluster cannot be
+      // balanced anymore; so just stop balancer
       if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) {
-        stopBalancer();
+        tryStopBalancer(iR.toString());
         return;
       }
 
@@ -215,7 +243,11 @@
         }
       }
     }
-    stopBalancer();
+
+    // finally, stop balancer if it hasn't been stopped already
+    if (isBalancerRunning()) {
+      tryStopBalancer("Completed all iterations.");
+    }
   }
 
   /**
@@ -238,12 +270,11 @@
     List<DatanodeUsageInfo> datanodeUsageInfos =
         nodeManager.getMostOrLeastUsedDatanodes(true);
     if (datanodeUsageInfos.isEmpty()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Container Balancer could not retrieve nodes from Node " +
-            "Manager.");
-      }
+      LOG.warn("Received an empty list of datanodes from Node Manager when " +
+          "trying to identify which nodes to balance");
       return false;
     }
+
     this.threshold = config.getThresholdAsRatio();
     this.maxDatanodesRatioToInvolvePerIteration =
         config.getMaxDatanodesRatioToInvolvePerIteration();
@@ -801,24 +832,72 @@
   /**
    * Receives a notification for raft or safe mode related status changes.
    * Stops ContainerBalancer if it's running and the current SCM becomes a
-   * follower or enters safe mode.
+   * follower or enters safe mode. Starts ContainerBalancer if the current
+   * SCM becomes leader, is out of safe mode and balancer should run.
    */
   @Override
   public void notifyStatusChanged() {
-    if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
-      if (isBalancerRunning()) {
-        stopBalancingThread();
+    lock.lock();
+    try {
+      if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
+        if (isBalancerRunning()) {
+          LOG.info("Stopping ContainerBalancer in this scm on status change");
+          stop();
+        }
+      } else {
+        if (shouldRun()) {
+          try {
+            LOG.info("Starting ContainerBalancer in this scm on status change");
+            start();
+          } catch (IllegalContainerBalancerStateException |
+              InvalidContainerBalancerConfigurationException e) {
+            LOG.warn("Could not start ContainerBalancer on raft/safe-mode " +
+                "status change.", e);
+          }
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 
   /**
-   * Checks if ContainerBalancer should run.
-   * @return false
+   * Checks if ContainerBalancer should start (after a leader change, restart
+   * etc.) by reading persisted state.
+   * @return true if the persisted state is true, otherwise false
    */
   @Override
   public boolean shouldRun() {
-    return false;
+    try {
+      ContainerBalancerConfigurationProto proto =
+          readConfiguration(ContainerBalancerConfigurationProto.class);
+      if (proto == null) {
+        LOG.warn("Could not find persisted configuration for {} when checking" +
+            " if ContainerBalancer should run. ContainerBalancer should not " +
+            "run now.", this.getServiceName());
+        return false;
+      }
+      return proto.getShouldRun();
+    } catch (IOException e) {
+      LOG.warn("Could not read persisted configuration for checking if " +
+          "ContainerBalancer should start. ContainerBalancer should not start" +
+          " now.", e);
+      return false;
+    }
+  }
+
+  /**
+   * Checks if ContainerBalancer is currently running in this SCM.
+   *
+   * @return true if the currentBalancingThread is not null, otherwise false
+   */
+  public boolean isBalancerRunning() {
+    lock.lock();
+    try {
+      return currentBalancingThread != null;
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -830,12 +909,47 @@
   }
 
   /**
-   * Starts ContainerBalancer as an SCMService.
+   * Starts ContainerBalancer as an SCMService. Validates state, reads and
+   * validates persisted configuration, and then starts the balancing
+   * thread.
+   * @throws IllegalContainerBalancerStateException if balancer should not
+   * run according to persisted configuration
+   * @throws InvalidContainerBalancerConfigurationException if failed to
+   * retrieve persisted configuration, or the configuration is null
    */
   @Override
-  public void start() {
-    if (shouldRun()) {
+  public void start() throws IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException {
+    lock.lock();
+    try {
+      // should be leader-ready, out of safe mode, and not running already
+      validateState(false);
+      ContainerBalancerConfigurationProto proto;
+      try {
+        proto = readConfiguration(ContainerBalancerConfigurationProto.class);
+      } catch (IOException e) {
+        throw new InvalidContainerBalancerConfigurationException("Could not " +
+            "retrieve persisted configuration while starting " +
+            "Container Balancer as an SCMService. Will not start now.", e);
+      }
+      if (proto == null) {
+        throw new InvalidContainerBalancerConfigurationException("Persisted " +
+            "configuration for ContainerBalancer is null during start. Will " +
+            "not start now.");
+      }
+      if (!proto.getShouldRun()) {
+        throw new IllegalContainerBalancerStateException("According to " +
+            "persisted configuration, ContainerBalancer should not run.");
+      }
+      ContainerBalancerConfiguration configuration =
+          ContainerBalancerConfiguration.fromProtobuf(proto,
+              ozoneConfiguration);
+      validateConfiguration(configuration);
+      this.config = configuration;
+      this.nextIterationIndex = proto.getNextIterationIndex();
       startBalancingThread();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -848,13 +962,16 @@
    * @throws InvalidContainerBalancerConfigurationException if
    * {@link ContainerBalancerConfiguration} config file is incorrectly
    * configured
+   * @throws IOException on failure to persist
+   * {@link ContainerBalancerConfiguration}
    */
-  public void startBalancer() throws IllegalContainerBalancerStateException,
-      InvalidContainerBalancerConfigurationException {
+  public void startBalancer(ContainerBalancerConfiguration configuration)
+      throws IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException, IOException {
     lock.lock();
     try {
-      validateState();
-      validateConfiguration(this.config);
+      // validates state, config, and then saves config
+      setBalancerConfigOnStartBalancer(configuration);
       startBalancingThread();
     } finally {
       lock.unlock();
@@ -867,7 +984,6 @@
   private void startBalancingThread() {
     lock.lock();
     try {
-      balancerRunning = true;
       currentBalancingThread = new Thread(this::balance);
       currentBalancingThread.setName("ContainerBalancer");
       currentBalancingThread.setDaemon(true);
@@ -879,11 +995,17 @@
   }
 
   /**
-   * Checks if ContainerBalancer can start.
-   * @throws IllegalContainerBalancerStateException if ContainerBalancer is
-   * already running, SCM is in safe mode, or SCM is not leader ready
+   * Validates balancer's state based on the specified expectedRunning.
+   * Confirms SCM is leader-ready and out of safe mode.
+   *
+   * @param expectedRunning true if ContainerBalancer is expected to be
+   *                        running, else false
+   * @throws IllegalContainerBalancerStateException if SCM is not
+   * leader-ready, is in safe mode, or state does not match the specified
+   * expected state
    */
-  private void validateState() throws IllegalContainerBalancerStateException {
+  private void validateState(boolean expectedRunning)
+      throws IllegalContainerBalancerStateException {
     if (!scmContext.isLeaderReady()) {
       LOG.warn("SCM is not leader ready");
       throw new IllegalContainerBalancerStateException("SCM is not leader " +
@@ -895,10 +1017,10 @@
     }
     lock.lock();
     try {
-      if (isBalancerRunning() || currentBalancingThread != null) {
-        LOG.warn("Cannot start ContainerBalancer because it's already running");
+      if (isBalancerRunning() != expectedRunning) {
         throw new IllegalContainerBalancerStateException(
-            "Cannot start ContainerBalancer because it's already running");
+            "Expect ContainerBalancer running state to be " + expectedRunning +
+                ", but running state is actually " + isBalancerRunning());
       }
     } finally {
       lock.unlock();
@@ -910,20 +1032,49 @@
    */
   @Override
   public void stop() {
-    stopBalancer();
+    lock.lock();
+    try {
+      if (!isBalancerRunning()) {
+        LOG.warn("Cannot stop Container Balancer because it's not running");
+        return;
+      }
+      stopBalancingThread();
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
    * Stops ContainerBalancer gracefully.
    */
-  public void stopBalancer() {
+  public void stopBalancer()
+      throws IOException, IllegalContainerBalancerStateException {
     lock.lock();
     try {
-      if (!isBalancerRunning()) {
-        LOG.info("Container Balancer is not running.");
-        return;
-      }
-      stopBalancingThread();
+      // should be leader, out of safe mode, and currently running
+      validateState(true);
+      saveConfiguration(config, false, 0);
+      stop();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Tries to stop ContainerBalancer. Logs the reason for stopping. Calls
+   * {@link ContainerBalancer#stopBalancer()}.
+   * @param stopReason a string specifying the reason for stopping
+   *                   ContainerBalancer.
+   */
+  private void tryStopBalancer(String stopReason) {
+    lock.lock();
+    try {
+      LOG.info("Stopping ContainerBalancer. Reason for stopping: {}",
+          stopReason);
+      stopBalancer();
+    } catch (IllegalContainerBalancerStateException | IOException e) {
+      LOG.warn("Tried to stop ContainerBalancer but failed. Reason for " +
+          "stopping: {}", stopReason, e);
     } finally {
       lock.unlock();
     }
@@ -933,7 +1084,6 @@
     Thread balancingThread;
     lock.lock();
     try {
-      balancerRunning = false;
       balancingThread = currentBalancingThread;
       currentBalancingThread = null;
     } finally {
@@ -952,6 +1102,20 @@
     LOG.info("Container Balancer stopped successfully.");
   }
 
+  private void saveConfiguration(ContainerBalancerConfiguration configuration,
+                                 boolean shouldRun, int index)
+      throws IOException {
+    lock.lock();
+    try {
+      saveConfiguration(configuration.toProtobufBuilder()
+          .setShouldRun(shouldRun)
+          .setNextIterationIndex(index)
+          .build());
+    } finally {
+      lock.unlock();
+    }
+  }
+
   private void validateConfiguration(ContainerBalancerConfiguration conf)
       throws InvalidContainerBalancerConfigurationException {
     // maxSizeEnteringTarget and maxSizeLeavingSource should by default be
@@ -1007,12 +1171,24 @@
   }
 
   /**
-   * Sets the configuration that ContainerBalancer will use. This should be
-   * set before starting balancer.
-   * @param config ContainerBalancerConfiguration
+   * Persists the configuration that ContainerBalancer will use after
+   * validating state and the specified configuration.
+   * @param configuration ContainerBalancerConfiguration to persist
+   * @throws InvalidContainerBalancerConfigurationException on failure to
+   * validate the specified configuration
+   * @throws IllegalContainerBalancerStateException if this SCM is not leader
+   * or not out of safe mode or if ContainerBalancer is currently running in
+   * this SCM
+   * @throws IOException on failure to persist configuration
    */
-  public void setConfig(ContainerBalancerConfiguration config) {
-    this.config = config;
+  private void setBalancerConfigOnStartBalancer(
+      ContainerBalancerConfiguration configuration)
+      throws InvalidContainerBalancerConfigurationException,
+      IllegalContainerBalancerStateException, IOException {
+    validateState(false);
+    validateConfiguration(configuration);
+    saveConfiguration(configuration, true, 0);
+    this.config = configuration;
   }
 
   /**
@@ -1060,15 +1236,6 @@
     return sourceToTargetMap;
   }
 
-  /**
-   * Checks if ContainerBalancer is currently running.
-   *
-   * @return true if ContainerBalancer is running, false if not running.
-   */
-  public boolean isBalancerRunning() {
-    return balancerRunning;
-  }
-
   @VisibleForTesting
   int getCountDatanodesInvolvedPerIteration() {
     return countDatanodesInvolvedPerIteration;
@@ -1096,7 +1263,7 @@
   public String toString() {
     String status = String.format("%nContainer Balancer status:%n" +
         "%-30s %s%n" +
-        "%-30s %b%n", "Key", "Value", "Running", balancerRunning);
+        "%-30s %b%n", "Key", "Value", "Running", isBalancerRunning());
     return status + config.toString();
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 4e994c8..9d50837 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -22,8 +22,11 @@
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,6 +212,10 @@
     return triggerDuEnable;
   }
 
+  public void setTriggerDuEnable(boolean enable) {
+    triggerDuEnable = enable;
+  }
+
   /**
    * Set the NetworkTopologyEnable value for Container Balancer.
    *
@@ -315,6 +322,10 @@
     this.moveTimeout = duration.toMillis();
   }
 
+  public void setMoveTimeout(long millis) {
+    this.moveTimeout = millis;
+  }
+
   public Duration getBalancingInterval() {
     return Duration.ofMillis(balancingInterval);
   }
@@ -323,6 +334,10 @@
     this.balancingInterval = balancingInterval.toMillis();
   }
 
+  public void setBalancingInterval(long millis) {
+    this.balancingInterval = millis;
+  }
+
   /**
    * Gets a set of datanode hostnames or ip addresses that will be the exclusive
    * participants in balancing.
@@ -390,4 +405,73 @@
         "Max Size Leaving Source per Iteration",
         maxSizeLeavingSource / OzoneConsts.GB);
   }
+
+  ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
+    ContainerBalancerConfigurationProto.Builder builder =
+        ContainerBalancerConfigurationProto.newBuilder();
+
+    builder.setUtilizationThreshold(threshold)
+        .setDatanodesInvolvedMaxPercentagePerIteration(
+            maxDatanodesPercentageToInvolvePerIteration)
+        .setSizeMovedMaxPerIteration(maxSizeToMovePerIteration)
+        .setSizeEnteringTargetMax(maxSizeEnteringTarget)
+        .setSizeLeavingSourceMax(maxSizeLeavingSource)
+        .setIterations(iterations)
+        .setExcludeContainers(excludeContainers)
+        .setMoveTimeout(moveTimeout)
+        .setBalancingIterationInterval(balancingInterval)
+        .setIncludeDatanodes(includeNodes)
+        .setExcludeDatanodes(excludeNodes)
+        .setMoveNetworkTopologyEnable(networkTopologyEnable)
+        .setTriggerDuBeforeMoveEnable(triggerDuEnable);
+    return builder;
+  }
+
+  static ContainerBalancerConfiguration fromProtobuf(
+      @NotNull ContainerBalancerConfigurationProto proto,
+      @NotNull OzoneConfiguration ozoneConfiguration) {
+    ContainerBalancerConfiguration config =
+        ozoneConfiguration.getObject(ContainerBalancerConfiguration.class);
+    if (proto.hasUtilizationThreshold()) {
+      config.setThreshold(Double.parseDouble(proto.getUtilizationThreshold()));
+    }
+    if (proto.hasDatanodesInvolvedMaxPercentagePerIteration()) {
+      config.setMaxDatanodesPercentageToInvolvePerIteration(
+          proto.getDatanodesInvolvedMaxPercentagePerIteration());
+    }
+    if (proto.hasSizeMovedMaxPerIteration()) {
+      config.setMaxSizeToMovePerIteration(proto.getSizeMovedMaxPerIteration());
+    }
+    if (proto.hasSizeEnteringTargetMax()) {
+      config.setMaxSizeEnteringTarget(proto.getSizeEnteringTargetMax());
+    }
+    if (proto.hasSizeLeavingSourceMax()) {
+      config.setMaxSizeLeavingSource(proto.getSizeLeavingSourceMax());
+    }
+    if (proto.hasIterations()) {
+      config.setIterations(proto.getIterations());
+    }
+    if (proto.hasExcludeContainers()) {
+      config.setExcludeContainers(proto.getExcludeContainers());
+    }
+    if (proto.hasMoveTimeout()) {
+      config.setMoveTimeout(proto.getMoveTimeout());
+    }
+    if (proto.hasBalancingIterationInterval()) {
+      config.setBalancingInterval(proto.getBalancingIterationInterval());
+    }
+    if (proto.hasIncludeDatanodes()) {
+      config.setIncludeNodes(proto.getIncludeDatanodes());
+    }
+    if (proto.hasExcludeDatanodes()) {
+      config.setExcludeNodes(proto.getExcludeDatanodes());
+    }
+    if (proto.hasMoveNetworkTopologyEnable()) {
+      config.setNetworkTopologyEnable(proto.getMoveNetworkTopologyEnable());
+    }
+    if (proto.hasTriggerDuBeforeMoveEnable()) {
+      config.setTriggerDuEnable(proto.getTriggerDuBeforeMoveEnable());
+    }
+    return config;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
index cc938e6..b061ddf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/IllegalContainerBalancerStateException.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.hdds.scm.container.balancer;
 
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
 /**
  * Signals that a state change cannot be performed on ContainerBalancer.
  */
-public class IllegalContainerBalancerStateException extends Exception {
+public class IllegalContainerBalancerStateException extends
+    SCMServiceException {
 
   /**
    * Constructs an IllegalContainerBalancerStateException with no detail
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
index c6a6bf0..9a4cc86 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/InvalidContainerBalancerConfigurationException.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.hdds.scm.container.balancer;
 
+import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
+
+import java.io.IOException;
+
 /**
  * Signals that {@link ContainerBalancerConfiguration} contains invalid
  * configuration value(s).
  */
-public class InvalidContainerBalancerConfigurationException extends Exception {
+public class InvalidContainerBalancerConfigurationException extends
+    SCMServiceException {
 
   /**
    * Constructs an InvalidContainerBalancerConfigurationException with no detail
@@ -44,4 +49,8 @@
     super(s);
   }
 
+  public InvalidContainerBalancerConfigurationException(String s,
+                                                        IOException e) {
+    super(s, e);
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
index 4d7c435..2b185c9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
@@ -66,7 +66,7 @@
   /**
    * starts the SCM service.
    */
-  void start();
+  void start() throws SCMServiceException;
 
   /**
    * stops the SCM service.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
new file mode 100644
index 0000000..72fb7d2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * Checked exceptions thrown by an {@link SCMService}.
+ */
+public class SCMServiceException extends Exception {
+
+  /**
+   * Constructs a new exception with {@code null} as its detail message.
+   */
+  public SCMServiceException() {
+    super();
+  }
+
+  public SCMServiceException(String s) {
+    super(s);
+  }
+
+  public SCMServiceException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SCMServiceException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
index 1b75c4f..2ab8f8e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
@@ -70,7 +70,11 @@
   public synchronized void start() {
     for (SCMService service : services) {
       LOG.debug("Stopping service:{}.", service.getServiceName());
-      service.start();
+      try {
+        service.start();
+      } catch (SCMServiceException e) {
+        LOG.warn("Could not start " + service.getServiceName(), e);
+      }
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
index 441e83c..69df7c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
@@ -20,8 +20,6 @@
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -35,10 +33,11 @@
 
   /**
    * Initialize a StatefulService from an extending class.
-   * @param scm {@link StorageContainerManager}
+   * @param stateManager a reference to the
+   * {@link StatefulServiceStateManager} from SCM.
    */
-  protected StatefulService(StorageContainerManager scm) {
-    stateManager = scm.getStatefulServiceStateManager();
+  protected StatefulService(StatefulServiceStateManager stateManager) {
+    this.stateManager = stateManager;
   }
 
   /**
@@ -60,21 +59,27 @@
    *
    * @param configType the Class object of the protobuf message type
    * @param <T>        the Type of the protobuf message
-   * @return persisted protobuf message
+   * @return persisted protobuf message or null if the entry is not found
    * @throws IOException on failure to fetch the message from DB or when
    *                     parsing it. ensure the specified configType is correct
    */
   protected final <T extends GeneratedMessage> T readConfiguration(
       Class<T> configType) throws IOException {
+    ByteString byteString = stateManager.readConfiguration(getServiceName());
+    if (byteString == null) {
+      return null;
+    }
     try {
       return configType.cast(ReflectionUtil.getMethod(configType,
               "parseFrom", ByteString.class)
-          .invoke(null, stateManager.readConfiguration(getServiceName())));
+          .invoke(null, byteString));
     } catch (NoSuchMethodException | IllegalAccessException
         | InvocationTargetException e) {
       e.printStackTrace();
-      throw new InvalidProtocolBufferException("GeneratedMessage cannot " +
-          "be parsed for type " + configType + ": " + e.getMessage());
+      throw new IOException("GeneratedMessage cannot be parsed. Ensure that "
+          + configType + " is the correct expected message type for " +
+          this.getServiceName(), e);
     }
+
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
index d470f1b..1e7a756 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
@@ -23,6 +23,8 @@
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Proxy;
@@ -34,6 +36,9 @@
 public final class StatefulServiceStateManagerImpl
     implements StatefulServiceStateManager {
 
+  public static final Logger LOG =
+      LoggerFactory.getLogger(StatefulServiceStateManagerImpl.class);
+
   // this table maps the service name to the configuration (ByteString)
   private Table<String, ByteString> statefulServiceConfig;
   private final DBTransactionBuffer transactionBuffer;
@@ -52,10 +57,19 @@
   public void saveConfiguration(String serviceName, ByteString bytes)
       throws IOException {
     transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added specified bytes to the transaction buffer for key " +
+          "{} to table {}", serviceName, statefulServiceConfig.getName());
+    }
+
     if (transactionBuffer instanceof SCMHADBTransactionBuffer) {
       SCMHADBTransactionBuffer buffer =
               (SCMHADBTransactionBuffer) transactionBuffer;
       buffer.flush();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Transaction buffer flushed");
+      }
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
new file mode 100644
index 0000000..e0cf00c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ByteStringCodec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha.io;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A dummy codec that serializes a ByteString object to ByteString.
+ */
+public class ByteStringCodec implements Codec {
+
+  @Override
+  public ByteString serialize(Object object)
+      throws InvalidProtocolBufferException {
+    return (ByteString) object;
+  }
+
+  @Override
+  public Object deserialize(Class<?> type, ByteString value)
+      throws InvalidProtocolBufferException {
+    return value;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
index 9fb771b..dae2b3c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.scm.ha.io;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.GeneratedMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.ProtocolMessageEnum;
@@ -45,6 +46,7 @@
     codecs.put(Boolean.class, new BooleanCodec());
     codecs.put(BigInteger.class, new BigIntegerCodec());
     codecs.put(X509Certificate.class, new X509CertificateCodec());
+    codecs.put(ByteString.class, new ByteStringCodec());
   }
 
   private CodecFactory() { }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 2d38331..fcee862 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -909,10 +909,9 @@
     }
 
     ContainerBalancer containerBalancer = scm.getContainerBalancer();
-    containerBalancer.setConfig(cbc);
     try {
-      containerBalancer.startBalancer();
-    } catch (IllegalContainerBalancerStateException |
+      containerBalancer.startBalancer(cbc);
+    } catch (IllegalContainerBalancerStateException | IOException |
         InvalidContainerBalancerConfigurationException e) {
       AUDIT.logWriteFailure(buildAuditMessageForFailure(
           SCMAction.START_CONTAINER_BALANCER, null, e));
@@ -931,9 +930,14 @@
   @Override
   public void stopContainerBalancer() throws IOException {
     getScm().checkAdminAccess(getRemoteUser());
-    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-        SCMAction.STOP_CONTAINER_BALANCER, null));
-    scm.getContainerBalancer().stopBalancer();
+    try {
+      scm.getContainerBalancer().stopBalancer();
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+          SCMAction.STOP_CONTAINER_BALANCER, null));
+    } catch (IllegalContainerBalancerStateException e) {
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(
+          SCMAction.STOP_CONTAINER_BALANCER, null, e));
+    }
   }
 
   @Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9e651c8..4440d4a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1469,10 +1469,15 @@
   @Override
   public void stop() {
     try {
-      LOG.info("Stopping Container Balancer service.");
-      containerBalancer.stopBalancer();
+      if (containerBalancer.isBalancerRunning()) {
+        LOG.info("Stopping Container Balancer service.");
+        // stop ContainerBalancer thread in this scm
+        containerBalancer.stop();
+      } else {
+        LOG.info("Container Balancer is not running.");
+      }
     } catch (Exception e) {
-      LOG.error("Failed to stop Container Balancer service.");
+      LOG.error("Failed to stop Container Balancer service.", e);
     }
 
     try {
diff --git a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
index f981495..deaf37a 100644
--- a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
+++ b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
@@ -27,7 +27,7 @@
     </tr>
     </tbody>
 </table>
-callback.sh
+
 <h2>Node counts</h2>
 
 <table class="table table-bordered table-striped" class="col-md-6">
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 4901617..98c92b2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.container.balancer;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -28,7 +29,6 @@
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -38,8 +38,11 @@
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
 import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -49,14 +52,13 @@
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -96,21 +98,24 @@
   private Map<ContainerID, ContainerInfo> cidToInfoMap = new HashMap<>();
   private Map<DatanodeUsageInfo, Set<ContainerID>> datanodeToContainersMap =
       new HashMap<>();
+  private Map<String, ByteString> serviceToConfigMap = new HashMap<>();
   private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
 
-  @Rule
-  public TemporaryFolder tempFolder = new TemporaryFolder();
+  private StatefulServiceStateManager serviceStateManager;
 
   /**
    * Sets up configuration values and creates a mock cluster.
    */
   @Before
-  public void setup() throws SCMException, NodeNotFoundException {
+  public void setup() throws IOException, NodeNotFoundException {
     conf = new OzoneConfiguration();
     scm = Mockito.mock(StorageContainerManager.class);
     containerManager = Mockito.mock(ContainerManager.class);
     replicationManager = Mockito.mock(ReplicationManager.class);
+    serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
+    SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class);
 
+    // these configs will usually be specified in each test
     balancerConfiguration =
         conf.getObject(ContainerBalancerConfiguration.class);
     balancerConfiguration.setThreshold(10);
@@ -161,7 +166,30 @@
     when(scm.getClusterMap()).thenReturn(null);
     when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class));
     when(scm.getConfiguration()).thenReturn(conf);
+    when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager);
+    when(scm.getSCMServiceManager()).thenReturn(scmServiceManager);
 
+    /*
+    When StatefulServiceStateManager#saveConfiguration is called, save to
+    in-memory serviceToConfigMap instead.
+     */
+    Mockito.doAnswer(i -> {
+      serviceToConfigMap.put(i.getArgument(0, String.class), i.getArgument(1,
+          ByteString.class));
+      return null;
+    }).when(serviceStateManager).saveConfiguration(
+        Mockito.any(String.class),
+        Mockito.any(ByteString.class));
+
+    /*
+    When StatefulServiceStateManager#readConfiguration is called, read from
+    serviceToConfigMap instead.
+     */
+    when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer(
+        i -> serviceToConfigMap.get(i.getArgument(0, String.class)));
+
+    Mockito.doNothing().when(scmServiceManager)
+        .register(Mockito.any(SCMService.class));
     containerBalancer = new ContainerBalancer(scm);
   }
 
@@ -184,7 +212,9 @@
    */
   @Test
   public void
-      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() {
+      initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     List<DatanodeUsageInfo> expectedUnBalancedNodes;
     List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer;
 
@@ -207,7 +237,7 @@
       unBalancedNodesAccordingToBalancer =
           containerBalancer.getUnBalancedNodes();
 
-      containerBalancer.stopBalancer();
+      stopBalancer();
       Assert.assertEquals(
           expectedUnBalancedNodes.size(),
           unBalancedNodesAccordingToBalancer.size());
@@ -224,13 +254,15 @@
    * balanced.
    */
   @Test
-  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() {
+  public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(99.99);
     startBalancer(balancerConfiguration);
 
     sleepWhileBalancing(100);
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
     Assert.assertEquals(0, containerBalancer.getUnBalancedNodes().size());
     Assert.assertEquals(0, metrics.getNumDatanodesUnbalanced());
@@ -241,7 +273,9 @@
    * maxDatanodesRatioToInvolvePerIteration limit.
    */
   @Test
-  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() {
+  public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     int percent = 20;
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(
         percent);
@@ -259,11 +293,13 @@
     Assert.assertTrue(metrics.getNumDatanodesInvolvedInLatestIteration() > 0);
     Assert.assertFalse(
         metrics.getNumDatanodesInvolvedInLatestIteration() > number);
-    containerBalancer.stopBalancer();
+    stopBalancer();
   }
 
   @Test
-  public void containerBalancerShouldSelectOnlyClosedContainers() {
+  public void containerBalancerShouldSelectOnlyClosedContainers()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     // make all containers open, balancer should not select any of them
     for (ContainerInfo containerInfo : cidToInfoMap.values()) {
       containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
@@ -271,7 +307,7 @@
     balancerConfiguration.setThreshold(10);
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // balancer should have identified unbalanced nodes
     Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
@@ -291,7 +327,7 @@
     }
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // check whether all selected containers are closed
     for (ContainerMoveSelection moveSelection:
@@ -303,7 +339,9 @@
   }
 
   @Test
-  public void containerBalancerShouldObeyMaxSizeToMoveLimit() {
+  public void containerBalancerShouldObeyMaxSizeToMoveLimit()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(1);
     balancerConfiguration.setMaxSizeToMovePerIteration(10 * OzoneConsts.GB);
     balancerConfiguration.setIterations(1);
@@ -319,11 +357,13 @@
         .getDataSizeMovedGBInLatestIteration();
     Assert.assertTrue(size > 0);
     Assert.assertFalse(size > 10);
-    containerBalancer.stopBalancer();
+    stopBalancer();
   }
 
   @Test
-  public void targetDatanodeShouldNotAlreadyContainSelectedContainer() {
+  public void targetDatanodeShouldNotAlreadyContainSelectedContainer()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -336,7 +376,7 @@
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
         containerBalancer.getSourceToTargetMap();
     for (ContainerMoveSelection moveSelection : sourceToTargetMap.values()) {
@@ -350,7 +390,9 @@
   }
 
   @Test
-  public void containerMoveSelectionShouldFollowPlacementPolicy() {
+  public void containerMoveSelectionShouldFollowPlacementPolicy()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
@@ -363,7 +405,7 @@
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap =
         containerBalancer.getSourceToTargetMap();
 
@@ -392,7 +434,8 @@
 
   @Test
   public void targetDatanodeShouldBeInServiceHealthy()
-      throws NodeNotFoundException {
+      throws NodeNotFoundException, IllegalContainerBalancerStateException,
+      IOException, InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -407,7 +450,7 @@
     } catch (InterruptedException e) {
     }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     for (ContainerMoveSelection moveSelection :
         containerBalancer.getSourceToTargetMap().values()) {
       DatanodeDetails target = moveSelection.getTargetNode();
@@ -419,7 +462,9 @@
   }
 
   @Test
-  public void selectedContainerShouldNotAlreadyHaveBeenSelected() {
+  public void selectedContainerShouldNotAlreadyHaveBeenSelected()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -434,7 +479,7 @@
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Set<ContainerID> containers = new HashSet<>();
     for (ContainerMoveSelection moveSelection :
         containerBalancer.getSourceToTargetMap().values()) {
@@ -445,7 +490,9 @@
   }
 
   @Test
-  public void balancerShouldNotSelectConfiguredExcludeContainers() {
+  public void balancerShouldNotSelectConfiguredExcludeContainers()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
     balancerConfiguration.setMaxSizeToMovePerIteration(50 * OzoneConsts.GB);
@@ -461,7 +508,7 @@
       Thread.sleep(1000);
     } catch (InterruptedException e) { }
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     Set<ContainerID> excludeContainers =
         balancerConfiguration.getExcludeContainers();
     for (ContainerMoveSelection moveSelection :
@@ -472,7 +519,9 @@
   }
 
   @Test
-  public void balancerShouldObeyMaxSizeEnteringTargetLimit() {
+  public void balancerShouldObeyMaxSizeEnteringTargetLimit()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     conf.set("ozone.scm.container.size", "1MB");
     balancerConfiguration =
         conf.getObject(ContainerBalancerConfiguration.class);
@@ -487,7 +536,7 @@
 
     Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
     Assert.assertTrue(containerBalancer.getSourceToTargetMap().isEmpty());
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // some containers should be selected when using default values
     OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
@@ -497,26 +546,28 @@
 
     sleepWhileBalancing(500);
 
-    containerBalancer.stopBalancer();
+    stopBalancer();
     // balancer should have identified unbalanced nodes
     Assert.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty());
     Assert.assertFalse(containerBalancer.getSourceToTargetMap().isEmpty());
   }
 
   @Test
-  public void testMetrics() {
+  public void testMetrics()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     conf.set("hdds.datanode.du.refresh.period", "1ms");
     balancerConfiguration.setBalancingInterval(Duration.ofMillis(2));
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setIterations(1);
     balancerConfiguration.setMaxSizeEnteringTarget(6 * OzoneConsts.GB);
-    // deliberately set max size per iteration to a low value, 6GB
+    // deliberately set max size per iteration to a low value, 6 GB
     balancerConfiguration.setMaxSizeToMovePerIteration(6 * OzoneConsts.GB);
     balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
 
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     ContainerBalancerMetrics metrics = containerBalancer.getMetrics();
     Assert.assertEquals(determineExpectedUnBalancedNodes(
@@ -535,7 +586,9 @@
    * exclude configurations, then it should be excluded.
    */
   @Test
-  public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() {
+  public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations()
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setIterations(1);
     balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -569,7 +622,7 @@
     balancerConfiguration.setIncludeNodes(includeNodes);
     startBalancer(balancerConfiguration);
     sleepWhileBalancing(500);
-    containerBalancer.stopBalancer();
+    stopBalancer();
 
     // finally, these should be the only nodes included in balancing
     // (included - excluded)
@@ -606,7 +659,9 @@
 
   @Test
   public void checkIterationResult()
-      throws NodeNotFoundException, ContainerNotFoundException {
+      throws NodeNotFoundException, IOException,
+      IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException {
     balancerConfiguration.setThreshold(10);
     balancerConfiguration.setIterations(1);
     balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
@@ -622,7 +677,7 @@
      */
     Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
         containerBalancer.getIterationResult());
-    containerBalancer.stop();
+    stopBalancer();
 
     /*
     Now, limit maxSizeToMovePerIteration but fail all container moves. The
@@ -640,12 +695,14 @@
 
     Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
         containerBalancer.getIterationResult());
-    containerBalancer.stop();
+    stopBalancer();
   }
 
   @Test
   public void checkIterationResultTimeout()
-      throws NodeNotFoundException, ContainerNotFoundException {
+      throws NodeNotFoundException, IOException,
+      IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException {
 
     Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
             Mockito.any(DatanodeDetails.class),
@@ -673,7 +730,7 @@
             .getNumContainerMovesCompletedInLatestIteration());
     Assert.assertTrue(containerBalancer.getMetrics()
             .getNumContainerMovesTimeoutInLatestIteration() > 1);
-    containerBalancer.stop();
+    stopBalancer();
 
   }
 
@@ -859,13 +916,19 @@
     }
   }
 
-  private void startBalancer(ContainerBalancerConfiguration config) {
-    containerBalancer.setConfig(config);
+  private void startBalancer(ContainerBalancerConfiguration config)
+      throws IllegalContainerBalancerStateException, IOException,
+      InvalidContainerBalancerConfigurationException {
+    containerBalancer.startBalancer(config);
+  }
+
+  private void stopBalancer() {
     try {
-      containerBalancer.startBalancer();
-    } catch (IllegalContainerBalancerStateException |
-        InvalidContainerBalancerConfigurationException e) {
-      LOG.info("Could not start ContainerBalancer while testing", e);
+      if (containerBalancer.isBalancerRunning()) {
+        containerBalancer.stopBalancer();
+      }
+    } catch (IOException | IllegalContainerBalancerStateException e) {
+      LOG.warn("Failed to stop balancer", e);
     }
   }
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index df279d8..04daa51 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -249,7 +249,8 @@
     // If the failure is NOT caused by other reasons (e.g. container full),
     // we assume it is caused by DN failure and exclude the failed DN.
     failedStreams.stream()
-        .filter(s -> !checkIfContainerToExclude(s.getIoException()))
+        .filter(s -> !checkIfContainerToExclude(
+            HddsClientUtils.checkForException(s.getIoException())))
         .forEach(s -> blockOutputStreamEntryPool.getExcludeList()
             .addDatanode(s.getDatanodeDetails()));
   }
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index 1f29425..b494fdf 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -40,10 +40,10 @@
 
   private final Map<String, ByteString> data = new HashMap<>();
 
-  private boolean failed = false;
+  private IOException exception = null;
 
-  public void setStorageFailed() {
-    this.failed = true;
+  public void setStorageFailed(IOException reason) {
+    this.exception = reason;
   }
 
   public void putBlock(DatanodeBlockID blockID, BlockData blockData) {
@@ -57,8 +57,8 @@
   public void writeChunk(
       DatanodeBlockID blockID,
       ChunkInfo chunkInfo, ByteString bytes) throws IOException {
-    if (failed) {
-      throw new IOException("This storage was marked as failed.");
+    if (exception != null) {
+      throw exception;
     }
     data.put(createKey(blockID, chunkInfo),
         ByteString.copyFrom(bytes.toByteArray()));
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index fbcc153..8c5fa76 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -23,11 +23,14 @@
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Factory to create the mock datanode clients.
@@ -37,30 +40,41 @@
 
   private final Map<DatanodeDetails, MockDatanodeStorage> storage =
       new HashMap<>();
-  private List<DatanodeDetails> pendingToFailNodes = new ArrayList<>();
+  private final Map<IOException, Set<DatanodeDetails>> pendingDNFailures =
+      new HashMap<>();
 
   public void setFailedStorages(List<DatanodeDetails> failedStorages) {
-    List<DatanodeDetails> remainingFailNodes = new ArrayList<>();
-    for (int i = 0; i < failedStorages.size(); i++) {
-      DatanodeDetails failedDN = failedStorages.get(i);
-      boolean isCurrentNodeMarked = false;
-      final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator =
-          storage.entrySet().iterator();
-      while (iterator.hasNext()) {
-        final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
-            iterator.next();
-        if (next.getKey().equals(failedDN)) {
-          final MockDatanodeStorage value = next.getValue();
-          value.setStorageFailed();
-          isCurrentNodeMarked = true;
-        }
-      }
-      if (!isCurrentNodeMarked) {
-        //This node does not initialized by client yet.
-        remainingFailNodes.add(failedDN);
+    mockStorageFailure(failedStorages,
+        new IOException("This storage was marked as failed."));
+  }
+
+  public void mockStorageFailure(Collection<DatanodeDetails> datanodes,
+      IOException reason) {
+    if (!pendingDNFailures.containsKey(reason)) {
+      pendingDNFailures.put(reason, new HashSet<>());
+    }
+    pendingDNFailures.get(reason).addAll(datanodes);
+    mockStorageFailure(reason);
+  }
+
+  private void mockStorageFailure(IOException reason) {
+
+    Collection<DatanodeDetails> datanodes =
+        pendingDNFailures.getOrDefault(reason, Collections.emptySet());
+
+    Iterator<DatanodeDetails> iterator = datanodes.iterator();
+    while (iterator.hasNext()) {
+      DatanodeDetails dn = iterator.next();
+      MockDatanodeStorage mockDN = storage.get(dn);
+      if (mockDN != null) {
+        mockDN.setStorageFailed(reason);
+        iterator.remove();
       }
     }
-    this.pendingToFailNodes = remainingFailNodes;
+
+    if (datanodes.isEmpty()) {
+      pendingDNFailures.remove(reason);
+    }
   }
 
   @Override
@@ -76,7 +90,9 @@
             .computeIfAbsent(pipeline.getFirstNode(),
                 r -> new MockDatanodeStorage()));
     // Incase if this node already set to mark as failed.
-    setFailedStorages(this.pendingToFailNodes);
+    for (IOException reason : pendingDNFailures.keySet()) {
+      mockStorageFailure(reason);
+    }
     return mockXceiverClientSpi;
   }
 
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 4dfe966..e468f16 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
 import java.io.IOException;
@@ -86,6 +87,8 @@
           r -> {
             try {
               return r.setWriteChunk(writeChunk(request.getWriteChunk()));
+            } catch (ContainerNotOpenException e) {
+              return r.setResult(Result.CLOSED_CONTAINER_IO);
             } catch (IOException e) {
               return r.setResult(Result.IO_EXCEPTION);
             }
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index e9ba156..024daa8 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -61,6 +62,7 @@
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -815,6 +817,69 @@
   }
 
   @Test
+  public void testExcludeOnDNFailure() throws IOException {
+    testExcludeFailedDN(IntStream.range(0, 5), IntStream.empty());
+  }
+
+  @Test
+  public void testExcludeOnDNClosed() throws IOException {
+    testExcludeFailedDN(IntStream.empty(), IntStream.range(0, 5));
+  }
+
+  @Test
+  public void testExcludeOnDNMixed() throws IOException {
+    testExcludeFailedDN(IntStream.range(0, 3), IntStream.range(3, 5));
+  }
+
+  private void testExcludeFailedDN(IntStream failedDNIndex,
+      IntStream closedDNIndex) throws IOException {
+    close();
+    OzoneConfiguration con = new OzoneConfiguration();
+    MultiNodePipelineBlockAllocator blkAllocator =
+        new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks, 10);
+    createNewClient(con, blkAllocator);
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    ECReplicationConfig repConfig = new ECReplicationConfig(
+        dataBlocks, parityBlocks, ECReplicationConfig.EcCodec.RS, chunkSize);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName,
+        (long) dataBlocks * chunkSize, repConfig, new HashMap<>())) {
+
+      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+      ECKeyOutputStream ecKeyOut = (ECKeyOutputStream) out.getOutputStream();
+
+      List<HddsProtos.DatanodeDetailsProto> dns = blkAllocator.getClusterDns();
+
+      // Then let's mark datanodes with closed container
+      List<DatanodeDetails> closedDNs = closedDNIndex
+          .mapToObj(i -> DatanodeDetails.getFromProtoBuf(dns.get(i)))
+          .collect(Collectors.toList());
+      ((MockXceiverClientFactory) factoryStub).mockStorageFailure(closedDNs,
+          new ContainerNotOpenException("Mocked"));
+
+      // Then let's mark failed datanodes
+      List<DatanodeDetails> failedDNs = failedDNIndex
+          .mapToObj(i -> DatanodeDetails.getFromProtoBuf(dns.get(i)))
+          .collect(Collectors.toList());
+      ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+      for (int i = 0; i < dataBlocks; i++) {
+        out.write(inputChunks[i % dataBlocks]);
+      }
+
+      // Assert excludeList only includes failedDNs
+      Assert.assertArrayEquals(failedDNs.toArray(new DatanodeDetails[0]),
+          ecKeyOut.getExcludeList().getDatanodes()
+              .toArray(new DatanodeDetails[0]));
+    }
+  }
+
+  @Test
   public void testLargeWriteOfMultipleStripesWithStripeFailure()
       throws IOException {
     close();
@@ -1110,4 +1175,4 @@
     }
     return locationInfoList;
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
index d824352..7846a01 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.fs.ozone;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.junit.Assert;
@@ -224,4 +225,29 @@
     Assert.assertTrue(deletes == prevDeletes + 1);
   }
 
+  /**
+   * Test the consistency of listStatusFSO with TableCache present.
+   */
+  @Test
+  public void testListStatusFSO() throws Exception {
+    // list keys batch size is 1024. Creating keys greater than the
+    // batch size to test batch listing of the keys.
+    int valueGreaterBatchSize = 1200;
+    Path parent = new Path(getBucketPath(), "testListStatusFSO");
+    for (int i = 0; i < valueGreaterBatchSize; i++) {
+      Path key = new Path(parent, "tempKey" + i);
+      ContractTestUtils.touch(getFs(), key);
+      /*
+      To add keys to the cache. listStatusFSO goes through the cache first.
+      The cache is not continuous and may be greater than the batch size.
+      This may cause inconsistency in the listing of keys.
+       */
+      getFs().rename(key, new Path(parent, "key" + i));
+    }
+
+    FileStatus[] fileStatuses = getFs().listStatus(
+        new Path(getBucketPath() + "/testListStatusFSO"));
+    Assert.assertEquals(valueGreaterBatchSize, fileStatuses.length);
+  }
+
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
index b4882ac..906b2aa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
@@ -16,10 +16,17 @@
  */
 package org.apache.hadoop.ozone.scm;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration;
+import org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException;
+import org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -36,6 +43,7 @@
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -44,7 +52,9 @@
 import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerBalancerConfigurationProto;
 import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
 import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 
@@ -226,6 +236,89 @@
     Assert.assertFalse(inflightMove.containsKey(id));
   }
 
+  /**
+   * Starts ContainerBalancer when the cluster is already balanced.
+   * ContainerBalancer will identify that no unbalanced nodes are present and
+   * exit and stop in the first iteration. We test that ContainerBalancer
+   * persists ContainerBalancerConfigurationProto#shouldRun as false in all
+   * the 3 SCMs when it stops.
+   * @throws IOException
+   * @throws IllegalContainerBalancerStateException
+   * @throws InvalidContainerBalancerConfigurationException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testContainerBalancerPersistsConfigurationInAllSCMs()
+      throws IOException, IllegalContainerBalancerStateException,
+      InvalidContainerBalancerConfigurationException, InterruptedException,
+      TimeoutException {
+    SCMClientConfig scmClientConfig =
+        conf.getObject(SCMClientConfig.class);
+    scmClientConfig.setRetryInterval(100);
+    scmClientConfig.setMaxRetryTimeout(1500);
+    Assertions.assertEquals(15, scmClientConfig.getRetryCount());
+    conf.setFromObject(scmClientConfig);
+    StorageContainerManager leader = getLeader(cluster);
+    Assertions.assertNotNull(leader);
+
+    ScmClient scmClient = new ContainerOperationClient(conf);
+    // assert that container balancer is not running right now
+    Assertions.assertFalse(scmClient.getContainerBalancerStatus());
+    ContainerBalancerConfiguration balancerConf =
+        conf.getObject(ContainerBalancerConfiguration.class);
+    ContainerBalancer containerBalancer = leader.getContainerBalancer();
+
+    /*
+    Start container balancer. Since this cluster is already balanced,
+    container balancer should exit early, stop, and persist configuration to DB.
+     */
+    containerBalancer.startBalancer(balancerConf);
+
+    // assert that balancer has stopped since the cluster is already balanced
+    GenericTestUtils.waitFor(() -> !containerBalancer.isBalancerRunning(),
+        10, 500);
+    Assertions.assertFalse(containerBalancer.isBalancerRunning());
+
+    ByteString byteString =
+        leader.getScmMetadataStore().getStatefulServiceConfigTable().get(
+            containerBalancer.getServiceName());
+    ContainerBalancerConfigurationProto proto =
+        ContainerBalancerConfigurationProto.parseFrom(byteString);
+    GenericTestUtils.waitFor(() -> !proto.getShouldRun(), 5, 50);
+
+    long leaderTermIndex =
+        leader.getScmHAManager().getRatisServer().getSCMStateMachine()
+        .getLastAppliedTermIndex().getIndex();
+
+    /*
+    Fetch persisted configuration to verify that `shouldRun` is set to false.
+     */
+    for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+      if (!scm.checkLeader()) {
+        // Wait and retry for follower to update transactions to leader
+        // snapshot index.
+        // Timeout error if follower does not load update within 3s
+        GenericTestUtils.waitFor(() -> scm.getScmHAManager().getRatisServer()
+            .getSCMStateMachine().getLastAppliedTermIndex()
+            .getIndex() >= leaderTermIndex, 100, 3000);
+        ContainerBalancer followerBalancer = scm.getContainerBalancer();
+        GenericTestUtils.waitFor(
+            () -> !followerBalancer.isBalancerRunning(), 50, 5000);
+        GenericTestUtils.waitFor(() -> !followerBalancer.shouldRun(), 100,
+            5000);
+      }
+      scm.getStatefulServiceStateManager().readConfiguration(
+          containerBalancer.getServiceName());
+      byteString =
+          scm.getScmMetadataStore().getStatefulServiceConfigTable().get(
+              containerBalancer.getServiceName());
+      ContainerBalancerConfigurationProto protobuf =
+          ContainerBalancerConfigurationProto.parseFrom(byteString);
+      Assertions.assertFalse(protobuf.getShouldRun());
+    }
+  }
+
   static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) {
     for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
       if (scm.checkLeader()) {