HDDS-3072. SCM scrub pipeline should be started after coming out of safe mode. (#605)


(cherry picked from commit 252f56d63d157eeb8b0da05e5fbb535783468b99)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 8e4ec6a..b8f0fb6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -111,10 +111,12 @@
         continue;
       }
 
-      try {
-        pipelineManager.scrubPipeline(type, factor);
-      } catch (IOException e) {
-        LOG.error("Error while scrubbing pipelines {}", e);
+      if (!pipelineManager.getSafeModeStatus()) {
+        try {
+          pipelineManager.scrubPipeline(type, factor);
+        } catch (IOException e) {
+          LOG.error("Error while scrubbing pipelines {}", e);
+        }
       }
 
       while (true) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 635e032..68d66ff 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -112,4 +112,17 @@
   default void waitPipelineReady(PipelineID pipelineID, long timeout)
       throws IOException {
   }
+
+  /**
+   * Set SafeMode status.
+   *
+   * @param safeModeStatus
+   */
+  void setSafeModeStatus(boolean safeModeStatus);
+
+  /**
+   * Get SafeMode status.
+   * @return boolean
+   */
+  boolean getSafeModeStatus();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index eb2e0d6..0989d34 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -54,6 +54,7 @@
 import java.util.Set;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
@@ -85,6 +86,8 @@
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
 
+  private final AtomicBoolean isInSafeMode;
+
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
       EventPublisher eventPublisher)
       throws IOException {
@@ -127,6 +130,9 @@
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
+    this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
   }
 
   public PipelineStateManager getStateManager() {
@@ -414,7 +420,7 @@
             .toEpochMilli() >= pipelineScrubTimeoutInMills)
         .collect(Collectors.toList());
     for (Pipeline p : needToSrubPipelines) {
-      LOG.info("srubbing pipeline: id: " + p.getId().toString() +
+      LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
           " since it stays at ALLOCATED stage for " +
           Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
           " mins.");
@@ -618,4 +624,15 @@
   protected NodeManager getNodeManager() {
     return nodeManager;
   }
+
+  @Override
+  public void setSafeModeStatus(boolean safeModeStatus) {
+    this.isInSafeMode.set(safeModeStatus);
+  }
+
+  @Override
+  public boolean getSafeModeStatus() {
+    return this.isInSafeMode.get();
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index 90fce3f..ca484c8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -134,7 +134,6 @@
         exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
             oneReplicaPipelineSafeModeRule);
       }
-      emitSafeModeStatus();
       boolean createPipelineInSafemode = conf.getBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index 2fbe893..50095ab 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -21,7 +21,6 @@
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
@@ -30,8 +29,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -94,8 +91,8 @@
    * Set SafeMode status based on
    * {@link org.apache.hadoop.hdds.scm.events.SCMEvents#SAFE_MODE_STATUS}.
    *
-   * Inform BlockManager, ScmClientProtocolServer and replicationAcitivity
-   * status about safeMode status.
+   * Inform BlockManager, ScmClientProtocolServer, ScmPipeline Manager and
+   * Replication Manager status about safeMode status.
    *
    * @param safeModeStatus
    * @param publisher
@@ -114,8 +111,9 @@
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
+        scmPipelineManager.setSafeModeStatus(isInSafeMode.get());
         replicationManager.start();
-        cleanupPipelines();
+        scmPipelineManager.triggerPipelineCreation();
       });
 
       safeModeExitThread.setDaemon(true);
@@ -124,21 +122,6 @@
 
   }
 
-  private void cleanupPipelines() {
-    List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
-    pipelineList.forEach((pipeline) -> {
-      try {
-        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
-            pipeline.isAllocationTimeout()) {
-          scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
-        }
-      } catch (IOException ex) {
-        LOG.error("Finalize and destroy pipeline failed for pipeline "
-            + pipeline.toString(), ex);
-      }
-    });
-  }
-
   public boolean getSafeModeStatus() {
     return isInSafeMode.get();
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 6063a62..1334293 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -358,6 +358,9 @@
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
     eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, safeModeHandler);
+
+    // Emit initial safe mode status, as now handlers are registered.
+    scmSafeModeManager.emitSafeModeStatus();
     registerMXBean();
     registerMetricsSource(this);
   }