[LIVY-977][SERVER][CONF] Livy can not be started if HDFS is still in safe mode (#440)

## What changes were proposed in this pull request?

HDFS safe mode is checked when livy session is created. If safe mode is ON, then IllegalStateException is thrown after max retry attempts (configurable) with safe mode interval (configurable) checks are done. If safe mode is OFF, then livy will be able to create session.

https://issues.apache.org/jira/browse/LIVY-977

## How was this patch tested?

Added unit test cases to validate code changes. Also, done manual testing in CDP cluster by creating livy sessions with HDFS safe mode check ON/OFF.
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 7566971..e99251d 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -195,3 +195,8 @@
 # Enable to allow custom classpath by proxy user in cluster mode
 # The below configuration parameter is disabled by default.
 # livy.server.session.allow-custom-classpath = true
+
+# value specifies interval to check safe mode in hdfs filesystem
+# livy.server.hdfs.safe-mode.interval = 5
+# value specifies max attempts to retry when safe mode is ON in hdfs filesystem
+# livy.server.hdfs.safe-mode.max.retry.attempts = 10
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 31b6872..720aa4e 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -252,6 +252,12 @@
   // how often to check livy session leakage
   val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")
 
+  // value specifies interval to check safe mode in hdfs filesystem
+  val HDFS_SAFE_MODE_INTERVAL_IN_SECONDS = Entry("livy.server.hdfs.safe-mode.interval", 5)
+
+  // value specifies max attempts to retry when safe mode is ON in hdfs filesystem
+  val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)
+
   // Whether session timeout should be checked, by default it will be checked, which means inactive
   // session will be stopped after "livy.server.session.timeout"
   val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
index 826a2fb..6fee7f0 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
@@ -20,6 +20,7 @@
 import java.io.{FileNotFoundException, IOException}
 import java.net.URI
 import java.util
+import java.util.concurrent.TimeUnit
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -28,6 +29,8 @@
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.hdfs.protocol.HdfsConstants
 
 import org.apache.livy.{LivyConf, Logging}
 import org.apache.livy.Utils.usingResource
@@ -42,6 +45,8 @@
     this(livyConf, None)
   }
 
+  private val fs = FileSystem.newInstance(livyConf.hadoopConf)
+
   private val fsUri = {
     val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
     require(fsPath != null && !fsPath.isEmpty,
@@ -57,6 +62,8 @@
     // Only Livy user should have access to state files.
     fileContext.setUMask(new FsPermission("077"))
 
+    startSafeModeCheck()
+
     // Create state store dir if it doesn't exist.
     val stateStorePath = absPath(".")
     try {
@@ -134,4 +141,42 @@
   }
 
   private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
+
+  /**
+   * Checks whether HDFS is in safe mode.
+   *
+   * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
+   * makes it more public than not.
+   */
+  def isFsInSafeMode(): Boolean = fs match {
+    case dfs: DistributedFileSystem =>
+      isFsInSafeMode(dfs)
+    case _ =>
+      false
+  }
+
+  def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
+    /* true to check only for Active NNs status */
+    dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true)
+  }
+
+  def startSafeModeCheck(): Unit = {
+    // Cannot probe anything while the FS is in safe mode,
+    // so wait for seconds which is configurable
+    val safeModeInterval = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS)
+    val safeModeMaxRetryAttempts = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS)
+    for (retryAttempts <- 0 to safeModeMaxRetryAttempts if isFsInSafeMode()) {
+      info("HDFS is still in safe mode. Waiting...")
+      Thread.sleep(TimeUnit.SECONDS.toMillis(safeModeInterval))
+    }
+
+    // if hdfs is still in safe mode
+    // even after max retry attempts
+    // then throw IllegalStateException
+    if (isFsInSafeMode()) {
+      throw new IllegalStateException("Reached max retry attempts for safe mode check " +
+        "in hdfs file system")
+    }
+  }
+
 }
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
index 082a80a..1ee1a2f 100644
--- a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
@@ -23,10 +23,11 @@
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
 import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.hamcrest.Description
 import org.mockito.ArgumentMatcher
 import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
-import org.mockito.Mockito.{atLeastOnce, verify, when}
+import org.mockito.Mockito.{atLeastOnce, spy, verify, when}
 import org.mockito.internal.matchers.Equals
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
@@ -53,6 +54,14 @@
       conf
     }
 
+    def makeConfWithTwoSeconds(): LivyConf = {
+      val conf = new LivyConf()
+      conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
+      conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2))
+      conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2))
+      conf
+    }
+
     def mockFileContext(rootDirPermission: String): FileContext = {
       val fileContext = mock[FileContext]
       val rootDirStatus = mock[FileStatus]
@@ -188,5 +197,29 @@
 
       verify(fileContext).delete(pathEq("/key"), equal(false))
     }
+
+    it("set safe mode ON and wait") {
+      val fileContext = mockFileContext("700")
+      val provider = spy(new FileSystemStateStore(makeConf(), Some(fileContext)))
+      val dfs = mock[DistributedFileSystem]
+      provider.isFsInSafeMode()
+      assert(!provider.isFsInSafeMode(dfs))
+    }
+
+    it("provider throws IllegalStateException when reaches 'N' " +
+      "max attempts to access HDFS file system") {
+      val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(),
+        Some(mockFileContext("700")))
+      provider.inSafeMode = true
+      intercept[IllegalStateException](provider.startSafeModeCheck())
+    }
   }
+
+  private class SafeModeTestProvider(conf: LivyConf, context: Option[FileContext])
+    extends FileSystemStateStore(conf, context) {
+    var inSafeMode = true
+
+    override def isFsInSafeMode(): Boolean = inSafeMode
+  }
+
 }