Handle ONE factor and THREE factor differently during pipeline allocaiton.
Also add consideration for CLOSED pipeline to join allocation.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index fe51f51..05fb0a6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -320,6 +320,13 @@
   // Setting to zero by default means this limit doesn't take effect.
   public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
 
+  // Upper limit for how many pipelines can be created.
+  // Only for test purpose now.
+  public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
+      "ozone.scm.datanode.pipeline.number.limit";
+  // Setting to zero by default means this limit doesn't take effect.
+  public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;
+
   public static final String
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
       "ozone.scm.keyvalue.container.deletion-choosing.policy";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a038047..1fdd8c0 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -838,10 +838,17 @@
     </description>
   </property>
   <property>
-    <name>ozone.scm.datanode.max.pipeline.engagement</name>
-    <value>5</value>
+  <name>ozone.scm.datanode.max.pipeline.engagement</name>
+  <value>0</value>
+  <tag>OZONE, SCM, PIPELINE</tag>
+  <description>Max number of pipelines per datanode can be engaged in.
+  </description>
+  </property>
+  <property>
+    <name>ozone.scm.datanode.pipeline.number.limit</name>
+    <value>0</value>
     <tag>OZONE, SCM, PIPELINE</tag>
-    <description>Max number of pipelines per datanode can be engaged in.
+    <description>Upper limit for how many pipelines can be created in SCM.
     </description>
   </property>
   <property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index a0a7222..1230227 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -188,6 +188,10 @@
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+        } catch (SCMException se) {
+          LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
+              "Datanodes may be used up.", type, factor, se);
+          break;
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
                   "get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6873566..6952f74 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -96,6 +96,7 @@
           if (scheduler.isClosed()) {
             break;
           }
+
           pipelineManager.createPipeline(type, factor);
         } catch (IOException ioe) {
           break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index df46fad..e41675d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -52,6 +52,7 @@
   static final Logger LOG =
       LoggerFactory.getLogger(PipelinePlacementPolicy.class);
   private final NodeManager nodeManager;
+  private final PipelineStateManager stateManager;
   private final Configuration conf;
   private final int heavyNodeCriteria;
 
@@ -59,15 +60,17 @@
    * Constructs a pipeline placement with considering network topology,
    * load balancing and rack awareness.
    *
-   * @param nodeManager Node Manager
+   * @param nodeManager NodeManager
+   * @param stateManager PipelineStateManager
    * @param conf        Configuration
    */
-  public PipelinePlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
+  public PipelinePlacementPolicy(final NodeManager nodeManager,
+      final PipelineStateManager stateManager, final Configuration conf) {
     super(nodeManager, conf);
     this.nodeManager = nodeManager;
     this.conf = conf;
-    heavyNodeCriteria = conf.getInt(
+    this.stateManager = stateManager;
+    this.heavyNodeCriteria = conf.getInt(
         ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
         ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
   }
@@ -76,16 +79,27 @@
    * Returns true if this node meets the criteria.
    *
    * @param datanodeDetails DatanodeDetails
+   * @param nodesRequired nodes required count
    * @return true if we have enough space.
    */
   @VisibleForTesting
-  boolean meetCriteria(DatanodeDetails datanodeDetails) {
+  boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
     if (heavyNodeCriteria == 0) {
       // no limit applied.
       return true;
     }
+    // Datanodes from pipeline in some states can also be considered available
+    // for pipeline allocation. Thus the number of these pipeline shall be
+    // deducted from total heaviness calculation.
+    int pipelineNumDeductable = (int)stateManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.valueOf(nodesRequired),
+        Pipeline.PipelineState.CLOSED)
+        .stream().filter(
+            p -> nodeManager.getPipelines(datanodeDetails).contains(p.getId()))
+        .count();
     boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
-        < heavyNodeCriteria);
+        - pipelineNumDeductable) < heavyNodeCriteria;
     if (!meet) {
       LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
           "datanodeļ¼š " + datanodeDetails.getUuid().toString() + " Heaviness: " +
@@ -134,13 +148,14 @@
 
     // filter nodes that meet the size and pipeline engagement criteria.
     // Pipeline placement doesn't take node space left into account.
-    List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
-        meetCriteria(d)).collect(Collectors.toList());
+    List<DatanodeDetails> healthyList = healthyNodes.stream()
+        .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+        .collect(Collectors.toList());
 
     if (healthyList.size() < nodesRequired) {
       msg = String.format("Unable to find enough nodes that meet " +
               "the criteria that cannot engage in more than %d pipelines." +
-              " Nodes required: %d Found: %d, healthy nodes count in" +
+              " Nodes required: %d Found: %d, healthy nodes count in " +
               "NodeManager: %d.",
           heavyNodeCriteria, nodesRequired, healthyList.size(),
           initialHealthyNodesCount);
@@ -173,8 +188,7 @@
     // Randomly picks nodes when all nodes are equal or factor is ONE.
     // This happens when network topology is absent or
     // all nodes are on the same rack.
-    if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())
-        || nodesRequired == HddsProtos.ReplicationFactor.ONE.getNumber()) {
+    if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
       return super.getResultSet(nodesRequired, healthyNodes);
     } else {
       // Since topology and rack awareness are available, picks nodes
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 6a51957..f6b80ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,10 +20,12 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 import org.apache.hadoop.io.MultipleIOException;
@@ -41,14 +43,13 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Implements Api for creating ratis pipelines.
@@ -85,13 +86,54 @@
     this.stateManager = stateManager;
     this.conf = conf;
     this.tlsConfig = tlsConfig;
-    this.placementPolicy = new PipelinePlacementPolicy(nodeManager, conf);
+    this.placementPolicy =
+        new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+  }
+
+  private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
+      throws SCMException {
+    Set<DatanodeDetails> dnsUsed = new HashSet<>();
+    stateManager.getPipelines(ReplicationType.RATIS, factor)
+        .stream().filter(
+          p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+              p.getPipelineState().equals(PipelineState.DORMANT) ||
+              p.getPipelineState().equals(PipelineState.ALLOCATED))
+        .forEach(p -> dnsUsed.addAll(p.getNodes()));
+
+    // Get list of healthy nodes
+    List<DatanodeDetails> dns = nodeManager
+        .getNodes(HddsProtos.NodeState.HEALTHY)
+        .parallelStream()
+        .filter(dn -> !dnsUsed.contains(dn))
+        .limit(factor.getNumber())
+        .collect(Collectors.toList());
+    if (dns.size() < factor.getNumber()) {
+      String e = String
+          .format("Cannot create pipeline of factor %d using %d nodes." +
+                  " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+              dns.size(), dnsUsed.size(),
+              nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+      throw new SCMException(e,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return dns;
   }
 
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
-    List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
-        null, factor.getNumber(), 0);
+    List<DatanodeDetails> dns;
+
+    switch(factor) {
+    case ONE:
+      dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+      break;
+    case THREE:
+      dns = placementPolicy.chooseDatanodes(null,
+          null, factor.getNumber(), 0);
+      break;
+    default:
+      throw new IllegalStateException("Unknown factor: " + factor.name());
+    }
 
     Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 777a0b0..21c4fbf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -64,8 +64,8 @@
       try {
         destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
       } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
-            pipeline.getId(), dn);
+        LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
+            pipeline.getId(), dn, e.getMessage());
       }
     }
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index a927d56..80c934f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -54,10 +55,6 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.hadoop.hdds.scm
-    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm
-    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
 
 /**
@@ -84,6 +81,8 @@
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
   private GrpcTlsConfig grpcTlsConfig;
+  private int pipelineNumberLimit;
+  private int heavyNodeCriteria;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
       EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
@@ -97,8 +96,8 @@
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
         new BackgroundPipelineCreator(this, scheduler, conf);
-    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
+        ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     final File metaDir = ServerUtils.getScmDbDir(conf);
     final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
     this.pipelineStore =
@@ -115,6 +114,12 @@
         "SCMPipelineManagerInfo", this);
     initializePipelineState();
     this.grpcTlsConfig = grpcTlsConfig;
+    this.pipelineNumberLimit = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
+    this.heavyNodeCriteria = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
   }
 
   public PipelineStateManager getStateManager() {
@@ -147,10 +152,33 @@
     }
   }
 
+  private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
+    if (heavyNodeCriteria > 0 && factor == ReplicationFactor.THREE) {
+      return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
+          stateManager.getPipelines(ReplicationType.RATIS, factor,
+              Pipeline.PipelineState.CLOSED).size()) >= heavyNodeCriteria *
+          nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY);
+    }
+
+    if (pipelineNumberLimit > 0) {
+      return (stateManager.getPipelines(ReplicationType.RATIS).size() -
+          stateManager.getPipelines(ReplicationType.RATIS,
+              Pipeline.PipelineState.CLOSED).size()) >= pipelineNumberLimit;
+    }
+
+    return false;
+  }
+
   @Override
   public synchronized Pipeline createPipeline(
       ReplicationType type, ReplicationFactor factor) throws IOException {
     lock.writeLock().lock();
+    if (type == ReplicationType.RATIS && exceedPipelineNumberLimit(factor)) {
+      lock.writeLock().unlock();
+      throw new SCMException("Pipeline number meets the limit: " +
+          pipelineNumberLimit,
+          SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
+    }
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
       pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
@@ -160,8 +188,6 @@
       metrics.incNumPipelineCreated();
       metrics.createPerPipelineMetrics(pipeline);
       return pipeline;
-    } catch (InsufficientDatanodesException idEx) {
-      throw idEx;
     } catch (IOException ex) {
       metrics.incNumPipelineCreationFailed();
       LOG.error("Pipeline creation failed.", ex);
@@ -173,7 +199,7 @@
 
   @Override
   public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
-                                 List<DatanodeDetails> nodes) {
+      List<DatanodeDetails> nodes) {
     // This will mostly be used to create dummy pipeline for SimplePipelines.
     // We don't update the metrics for SimplePipelines.
     lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index d0f7f6e..1b23036 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -124,6 +124,14 @@
   }
 
   /**
+   * Get the number of pipeline created.
+   * @return number of pipeline
+   */
+  long getNumPipelineCreated() {
+    return numPipelineCreated.value();
+  }
+
+  /**
    * Increments number of failed pipeline creation count.
    */
   void incNumPipelineCreationFailed() {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index e200d6f..1e34039 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -51,8 +51,8 @@
         PIPELINE_PLACEMENT_MAX_NODES_COUNT);
     conf = new OzoneConfiguration();
     conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
-    placementPolicy =
-        new PipelinePlacementPolicy(nodeManager, conf);
+    placementPolicy = new PipelinePlacementPolicy(
+        nodeManager, new PipelineStateManager(conf), conf);
   }
 
   @Test
@@ -128,7 +128,7 @@
   public void testHeavyNodeShouldBeExcluded() throws SCMException{
     List<DatanodeDetails> healthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
-    int nodesRequired = healthyNodes.size()/2;
+    int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
     // only minority of healthy NODES are heavily engaged in pipelines.
     int minorityHeavy = healthyNodes.size()/2 - 1;
     List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 6ace90c..d0afbbe 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -34,6 +35,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
@@ -48,9 +50,12 @@
   public void init(int numDatanodes) throws Exception {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         GenericTestUtils.getRandomizedTempPath());
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
     cluster = MiniOzoneCluster.newBuilder(conf)
             .setNumDatanodes(numDatanodes)
-            .setHbInterval(1000)
+            .setPipelineNumber(numDatanodes + numDatanodes/3)
+            .setHbInterval(2000)
             .setHbProcessorInterval(1000)
             .build();
     cluster.waitForClusterToBeReady();
@@ -103,7 +108,7 @@
     } catch (IOException ioe) {
       // As now all datanodes are shutdown, they move to stale state, there
       // will be no sufficient datanodes to create the pipeline.
-      Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
+      Assert.assertTrue(ioe instanceof SCMException);
     }
 
     // make sure pipelines is destroyed
@@ -116,9 +121,13 @@
     for (Pipeline pipeline : pipelines) {
       pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
-    // make sure pipelines is created after node start
-    pipelineManager.triggerPipelineCreation();
-    waitForPipelines(1);
+
+    if (cluster.getStorageContainerManager()
+        .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) > 0) {
+      // make sure pipelines is created after node start
+      pipelineManager.triggerPipelineCreation();
+      waitForPipelines(1);
+    }
   }
 
   private void waitForPipelines(int numPipelines)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 0a8c5ad..7526575 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -137,52 +137,38 @@
   @Test
   public void testCreatePipelinesDnExclude() throws IOException {
 
-    // We have 10 DNs in MockNodeManager.
-    // Use up first 3 DNs for an open pipeline.
-    List<DatanodeDetails> openPiplineDns = nodeManager.getAllNodes()
-        .subList(0, 3);
+    List<DatanodeDetails> allHealthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    int totalHealthyNodesCount = allHealthyNodes.size();
+
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
 
-    Pipeline openPipeline = Pipeline.newBuilder()
-        .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(factor)
-        .setNodes(openPiplineDns)
-        .setState(Pipeline.PipelineState.OPEN)
-        .setId(PipelineID.randomId())
-        .build();
+    List<DatanodeDetails> closePipelineDns = new ArrayList<>();
+    for (int i = 0; i < totalHealthyNodesCount/3; i++) {
+      List<DatanodeDetails> pipelineDns = allHealthyNodes
+          .subList(3 * i, 3 * (i + 1));
 
-    stateManager.addPipeline(openPipeline);
-    nodeManager.addPipeline(openPipeline);
-    for (DatanodeDetails node : openPipeline.getNodes()) {
-      System.out.println("open pipeline contains " + node.getUuid());
+      Pipeline.PipelineState state;
+      if (i % 2 == 0) {
+        state = Pipeline.PipelineState.OPEN;
+      } else {
+        state = Pipeline.PipelineState.CLOSED;
+        closePipelineDns.addAll(pipelineDns);
+      }
+
+      Pipeline openPipeline = Pipeline.newBuilder()
+          .setType(HddsProtos.ReplicationType.RATIS)
+          .setFactor(factor)
+          .setNodes(pipelineDns)
+          .setState(state)
+          .setId(PipelineID.randomId())
+          .build();
+
+
+      stateManager.addPipeline(openPipeline);
+      nodeManager.addPipeline(openPipeline);
     }
 
-    // Use up next 3 DNs also for an open pipeline.
-    List<DatanodeDetails> moreOpenPiplineDns = nodeManager.getAllNodes()
-        .subList(3, 6);
-    Pipeline anotherOpenPipeline = Pipeline.newBuilder()
-        .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(factor)
-        .setNodes(moreOpenPiplineDns)
-        .setState(Pipeline.PipelineState.OPEN)
-        .setId(PipelineID.randomId())
-        .build();
-    stateManager.addPipeline(anotherOpenPipeline);
-    nodeManager.addPipeline(anotherOpenPipeline);
-
-    // Use up next 3 DNs also for a closed pipeline.
-    List<DatanodeDetails> closedPiplineDns = nodeManager.getAllNodes()
-        .subList(6, 9);
-    Pipeline anotherClosedPipeline = Pipeline.newBuilder()
-        .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(factor)
-        .setNodes(closedPiplineDns)
-        .setState(Pipeline.PipelineState.CLOSED)
-        .setId(PipelineID.randomId())
-        .build();
-    stateManager.addPipeline(anotherClosedPipeline);
-    nodeManager.addPipeline(anotherClosedPipeline);
-
     Pipeline pipeline = provider.create(factor);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
@@ -194,6 +180,6 @@
     // Since we have only 10 DNs, at least 1 pipeline node should have been
     // from the closed pipeline DN list.
     Assert.assertTrue(pipelineNodes.parallelStream().filter(
-        closedPiplineDns::contains).count() > 0);
+        closePipelineDns::contains).count() > 0);
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 459a67a..1af2f74 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -57,8 +57,11 @@
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    int numOfNodes = 4;
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(4)
+        .setNumDatanodes(numOfNodes)
+        // allow only one FACTOR THREE pipeline.
+        .setPipelineNumber(numOfNodes + 1)
         .setHbInterval(1000)
         .setHbProcessorInterval(1000)
         .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 7cfd555..1caa302 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -38,6 +38,7 @@
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.junit.Assert.fail;
 
 /**
@@ -62,8 +63,11 @@
         true);
     conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s");
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1000);
+
     clusterBuilder = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numDatanodes)
+        .setPipelineNumber(numDatanodes + numDatanodes/3)
         .setHbInterval(1000)
         .setHbProcessorInterval(1000);
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 19c1406..4fe1701 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -36,8 +36,6 @@
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
-
 /**
  * Interface used for MiniOzoneClusters.
  */
@@ -268,11 +266,10 @@
     protected int numOfDatanodes = 1;
     protected boolean  startDataNodes = true;
     protected CertificateClient certClient;
+    protected int pipelineNumber = 3;
 
     protected Builder(OzoneConfiguration conf) {
       this.conf = conf;
-      // MiniOzoneCluster doesn't have pipeline engagement limit.
-      conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
       setClusterId(UUID.randomUUID().toString());
     }
 
@@ -357,6 +354,16 @@
     }
 
     /**
+     * Sets the total number of pipelines to create.
+     * @param val number of pipelines
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setPipelineNumber(int val) {
+      pipelineNumber = val;
+      return this;
+    }
+
+    /**
      * Sets the number of HeartBeat Interval of Datanodes, the value should be
      * in MilliSeconds.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482..39b2582 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -494,6 +494,9 @@
           streamBufferMaxSize.get(), streamBufferSizeUnit.get());
       conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
           streamBufferSizeUnit.get());
+      // MiniOzoneCluster should have global pipeline upper limit.
+      conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+          pipelineNumber == 3 ? 2 * numOfDatanodes : pipelineNumber);
       configureTrace();
     }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index cf570d2..ea648c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -81,6 +81,7 @@
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setPipelineNumber(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 8649837..9088497 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -89,8 +90,10 @@
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
+
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
-        .setBlockSize(blockSize).setChunkSize(chunkSize)
+        .setPipelineNumber(10).setBlockSize(blockSize).setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
         .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index ea51900..ff9fad4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -96,6 +96,7 @@
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setPipelineNumber(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index 0886d26..865e0b5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -54,8 +54,7 @@
 import java.util.function.Predicate;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * Tests delete key operation with a slow follower in the datanode
@@ -99,10 +98,12 @@
         1000, TimeUnit.SECONDS);
     conf.setLong("hdds.scm.replication.thread.interval",
         containerReportInterval);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4)
+            .setPipelineNumber(6).setHbInterval(200)
             .build();
     cluster.waitForClusterToBeReady();
     cluster.getStorageContainerManager().getReplicationManager().start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 19a1707..37b8a5f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,8 +52,7 @@
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * Tests the containerStateMachine failure handling.
@@ -82,7 +81,7 @@
     baseDir.mkdirs();
 
     conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
-  //  conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    //  conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 30c2624..00556a8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -80,6 +80,7 @@
   private static String bucketName;
   private static String path;
   private static XceiverClientManager xceiverClientManager;
+  private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -111,10 +112,12 @@
         1000, TimeUnit.SECONDS);
     conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
         1, TimeUnit.SECONDS);
-
     conf.setQuietMode(false);
-    cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+    int numOfDatanodes = 3;
+    cluster = MiniOzoneCluster.newBuilder(conf)
+            .setNumDatanodes(numOfDatanodes)
+            .setPipelineNumber(numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
+            .setHbInterval(100)
             .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
@@ -176,7 +179,7 @@
         cluster.getStorageContainerManager().getPipelineManager()
             .getPipelines(HddsProtos.ReplicationType.RATIS,
                 HddsProtos.ReplicationFactor.THREE);
-    Assert.assertTrue(pipelineList.size() == 1);
+    Assert.assertEquals(FACTOR_THREE_PIPELINE_COUNT, pipelineList.size());
     Pipeline pipeline = pipelineList.get(0);
     for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
       if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index edb796b..0368323 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -97,6 +98,7 @@
         1, TimeUnit.SECONDS);
     conf.setBoolean(
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -105,7 +107,7 @@
         Collections.singleton(HddsUtils.getHostName(conf))).get(0),
         "/rack1");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(10).build();
+        .setNumDatanodes(10).setPipelineNumber(15).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
index 47a716e..84649e3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
@@ -67,7 +67,8 @@
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
+        .setPipelineNumber(5).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index fa8a289..666264c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -81,6 +81,7 @@
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
+        .setPipelineNumber(5)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index 9666247..6dbae6a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -47,8 +47,7 @@
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * Tests MultiBlock Writes with Dn failures by Ozone Client.
@@ -87,10 +86,13 @@
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         1, TimeUnit.SECONDS);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(datanodes).build();
+        .setNumDatanodes(datanodes)
+        .setPipelineNumber(0)
+        .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5f6d494..9e7e3c0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -91,6 +91,7 @@
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setPipelineNumber(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index d91f739..4710ada 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -164,6 +164,7 @@
   static void startCluster(OzoneConfiguration conf) throws Exception {
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
+        .setPipelineNumber(10)
         .setScmId(scmId)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 9b59349..fa89f5b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -53,8 +53,7 @@
 import java.util.concurrent.TimeoutException;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * This class verifies the watchForCommit Handling by xceiverClient.
@@ -92,10 +91,12 @@
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
         1, TimeUnit.SECONDS);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
 
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setPipelineNumber(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index b676e1c..763f639 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -52,6 +52,8 @@
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test container closing.
  */
@@ -73,8 +75,11 @@
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(10)
+        .setPipelineNumber(15)
         .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 536d807..191589a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -82,6 +82,7 @@
         "/rack1");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(4)
+        .setPipelineNumber(10)
         .build();
     cluster.waitForClusterToBeReady();
     metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
index c9b8c89..618212a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.scm.node;
 
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -77,9 +78,11 @@
     conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
     conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
 
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numOfDatanodes)
+        .setPipelineNumber(numOfDatanodes + numOfDatanodes/2)
         .build();
     cluster.waitForClusterToBeReady();
     scmClient = new ContainerOperationClient(cluster
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index ab35191..bd7173c 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -99,6 +99,7 @@
     conf.setTimeDuration(
         OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
 
     OMStorage omStore = new OMStorage(conf);
     omStore.setClusterId(clusterId);
@@ -108,6 +109,8 @@
 
     // Start the cluster
     cluster = MiniOzoneCluster.newHABuilder(conf)
+        .setNumDatanodes(7)
+        .setPipelineNumber(10)
         .setClusterId(clusterId)
         .setScmId(scmId)
         .setOMServiceId(omServiceId)
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index fdcb822..eb19fe7 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -42,7 +42,7 @@
   static void startCluster(OzoneConfiguration conf) throws Exception {
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(5).build();
+        .setNumDatanodes(5).setPipelineNumber(8).build();
     cluster.waitForClusterToBeReady();
   }
 
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 13ecab6..cc922f2 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -53,6 +53,7 @@
       .setHbProcessorInterval(1000)
       .setHbInterval(1000)
       .setNumDatanodes(3)
+      .setPipelineNumber(8)
       .build();
     cluster.waitForClusterToBeReady();
   }