Implement FCPSchedulerServer (#5030)

diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
similarity index 64%
rename from core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
rename to core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
index 841b139..874362f 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
@@ -18,12 +18,14 @@
 package org.apache.openwhisk.core.scheduler
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.StatusCodes
 import akka.http.scaladsl.model.headers.BasicHttpCredentials
 import akka.http.scaladsl.server.Route
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.http.BasicRasService
 import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.DefaultJsonProtocol._
 import spray.json._
 
 import scala.concurrent.ExecutionContext
@@ -32,7 +34,7 @@
  * Implements web server to handle certain REST API calls.
  * Currently provides a health ping route, only.
  */
-class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
   implicit val ec: ExecutionContext,
   implicit val actorSystem: ActorSystem,
   implicit val logger: Logging)
@@ -41,10 +43,28 @@
   override def routes(implicit transid: TransactionId): Route = {
     super.routes ~ extractCredentials {
       case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
-        (path("disable") & post) {
+        (path("state") & get) {
+          complete {
+            scheduler.getState.map {
+              case (list, creationCount) =>
+                (list
+                  .map(scheduler => scheduler._1.asString -> scheduler._2.toString)
+                  .toMap
+                  ++ Map("creationCount" -> creationCount.toString)).toJson.asJsObject
+            }
+          }
+        } ~ (path("disable") & post) {
           logger.warn(this, "Scheduler is disabled")
           scheduler.disable()
           complete("scheduler disabled")
+        } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) {
+          complete {
+            scheduler.getQueueSize.map(_.toString)
+          }
+        } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) {
+          complete {
+            scheduler.getQueueStatusData.map(s => s.toJson)
+          }
         }
       case _ =>
         implicit val jsonPrettyResponsePrinter = PrettyPrinter
@@ -53,21 +73,16 @@
   }
 }
 
-object SchedulerServer {
+object FPCSchedulerServer {
 
-  val schedulerUsername = {
-    val source = scala.io.Source.fromFile("/conf/schedulerauth.username")
-    try source.mkString.replaceAll("\r|\n", "")
-    finally source.close()
-  }
-  val schedulerPassword = {
-    val source = scala.io.Source.fromFile("/conf/schedulerauth.password")
-    try source.mkString.replaceAll("\r|\n", "")
-    finally source.close()
-  }
+  // TODO: TBD, after FPCScheduler is ready, can read the credentials from pureconfig
+  val schedulerUsername = "admin"
+  val schedulerPassword = "admin"
+
+  val queuePathPrefix = "queue"
 
   def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
                                          actorSystem: ActorSystem,
                                          logger: Logging): BasicRasService =
-    new SchedulerServer(scheduler, schedulerUsername, schedulerPassword)
+    new FPCSchedulerServer(scheduler, schedulerUsername, schedulerPassword)
 }
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 9fc793b..6bb4311 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -81,21 +81,20 @@
   val durationChecker = "" // TODO: TBD
 
   override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
-    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it
   }
 
   override def getQueueSize: Future[Int] = {
-    Future.successful(0) // TODO: TBD
+    Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it
   }
 
-  override def getQueueStatusData: Future[List[String]] = {
-    Future.successful(List("")) // TODO: TBD
+  override def getQueueStatusData: Future[List[StatusData]] = {
+    Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it
   }
 
-  // other components don't need to shutdown gracefully
   override def disable(): Unit = {
     logging.info(this, s"Gracefully shutting down the scheduler")
-    // TODO: TBD, gracefully shut down the container manager and queue manager
+    // TODO: TBD, after containerManager and queueManager are ready, can implement it
   }
 
   private def getUserLimit(invocationNamespace: String): Future[Int] = {
@@ -160,7 +159,7 @@
 
   def getQueueSize: Future[Int]
 
-  def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+  def getQueueStatusData: Future[List[StatusData]]
 
   def disable(): Unit
 }
@@ -253,7 +252,7 @@
         val httpsConfig =
           if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
 
-        BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route, port, httpsConfig)(
+        BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(
           actorSystem,
           ActorMaterializer.create(actorSystem))
 
diff --git a/tests/build.gradle b/tests/build.gradle
index 57c3bd7..49fac12 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -325,6 +325,7 @@
     doLast{
         Project common = project(":common:scala")
         Project controller = project(":core:controller")
+        Project scheduler = project(":core:scheduler")
         Project invoker = project(":core:invoker")
 
         Properties wskProps = loadWhiskProps()
@@ -335,6 +336,8 @@
 
         copyAndRenameMeasurementFile(covLogs, 'controller', "common", common)
         copyAndRenameMeasurementFile(covLogs, 'controller', "controller", controller)
+        copyAndRenameMeasurementFile(covLogs, 'scheduler', "common", common)
+        copyAndRenameMeasurementFile(covLogs, 'scheduler', "scheduler", scheduler)
         copyAndRenameMeasurementFile(covLogs, 'invoker', "common", common)
         copyAndRenameMeasurementFile(covLogs, 'invoker', "invoker", invoker)
     }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
new file mode 100644
index 0000000..0dab4f4
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.StatusCodes._
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.StatusData
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+
+import scala.concurrent.Future
+
+/**
+ * Tests SchedulerServer API.
+ */
+@RunWith(classOf[JUnitRunner])
+class FPCSchedulerServerTests
+    extends FlatSpec
+    with BeforeAndAfterEach
+    with BeforeAndAfterAll
+    with ScalatestRouteTest
+    with Matchers
+    with StreamLogging
+    with MockFactory {
+
+  def transid() = TransactionId("tid")
+
+  val systemUsername = "username"
+  val systemPassword = "password"
+
+  val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 3))
+  val creationCount = 1
+  val testQueueSize = 2
+  val statusDatas = List(
+    StatusData("testns1", "testaction1", 10, "Running", "RunningData"),
+    StatusData("testns2", "testaction2", 5, "Running", "RunningData"))
+
+  // Create scheduler
+  val scheduler = new TestScheduler(queues, creationCount, testQueueSize, statusDatas)
+  val server = new FPCSchedulerServer(scheduler, systemUsername, systemPassword)
+
+  override protected def afterEach(): Unit = scheduler.reset()
+
+  /** FPCSchedulerServer API tests */
+  behavior of "FPCSchedulerServer API"
+
+  // POST /disable
+  it should "disable scheduler" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      scheduler.shutdownCount shouldBe 1
+    }
+  }
+
+  // GET /state
+  it should "get scheduler state" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/state") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> s._2.toString).toMap ++ Map(
+        "creationCount" -> creationCount.toString)).toJson
+    }
+  }
+
+  // GET /queue/total
+  it should "get total queue" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/queue/total") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      responseAs[String] shouldBe testQueueSize.toString
+    }
+  }
+
+  // GET /queue/status
+  it should "get all queue status" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/queue/status") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson)
+    }
+  }
+
+  // POST /disable with invalid credential
+  it should "not call scheduler api with invalid credential" in {
+    implicit val tid = transid()
+    val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+    Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      scheduler.shutdownCount shouldBe 0
+    }
+  }
+
+  // POST /disable with empty credential
+  it should "not call scheduler api with empty credential" in {
+    implicit val tid = transid()
+    Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      scheduler.shutdownCount shouldBe 0
+    }
+  }
+
+}
+
+class TestScheduler(schedulerStates: List[(SchedulerInstanceId, Int)],
+                    creationCount: Int,
+                    queueSize: Int,
+                    statusDatas: List[StatusData])
+    extends SchedulerCore {
+  var shutdownCount = 0
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] =
+    Future.successful(schedulerStates, creationCount)
+
+  override def getQueueSize: Future[Int] = Future.successful(queueSize)
+
+  override def getQueueStatusData: Future[List[StatusData]] = Future.successful(statusDatas)
+
+  override def disable(): Unit = shutdownCount += 1
+
+  def reset(): Unit = {
+    shutdownCount = 0
+  }
+}