Implement FCPSchedulerServer (#5030)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
similarity index 64%
rename from core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
rename to core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
index 841b139..874362f 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
@@ -18,12 +18,14 @@
package org.apache.openwhisk.core.scheduler
import akka.actor.ActorSystem
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.http.BasicRasService
import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.DefaultJsonProtocol._
import spray.json._
import scala.concurrent.ExecutionContext
@@ -32,7 +34,7 @@
* Implements web server to handle certain REST API calls.
* Currently provides a health ping route, only.
*/
-class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
+class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)(
implicit val ec: ExecutionContext,
implicit val actorSystem: ActorSystem,
implicit val logger: Logging)
@@ -41,10 +43,28 @@
override def routes(implicit transid: TransactionId): Route = {
super.routes ~ extractCredentials {
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
- (path("disable") & post) {
+ (path("state") & get) {
+ complete {
+ scheduler.getState.map {
+ case (list, creationCount) =>
+ (list
+ .map(scheduler => scheduler._1.asString -> scheduler._2.toString)
+ .toMap
+ ++ Map("creationCount" -> creationCount.toString)).toJson.asJsObject
+ }
+ }
+ } ~ (path("disable") & post) {
logger.warn(this, "Scheduler is disabled")
scheduler.disable()
complete("scheduler disabled")
+ } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) {
+ complete {
+ scheduler.getQueueSize.map(_.toString)
+ }
+ } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) {
+ complete {
+ scheduler.getQueueStatusData.map(s => s.toJson)
+ }
}
case _ =>
implicit val jsonPrettyResponsePrinter = PrettyPrinter
@@ -53,21 +73,16 @@
}
}
-object SchedulerServer {
+object FPCSchedulerServer {
- val schedulerUsername = {
- val source = scala.io.Source.fromFile("/conf/schedulerauth.username")
- try source.mkString.replaceAll("\r|\n", "")
- finally source.close()
- }
- val schedulerPassword = {
- val source = scala.io.Source.fromFile("/conf/schedulerauth.password")
- try source.mkString.replaceAll("\r|\n", "")
- finally source.close()
- }
+ // TODO: TBD, after FPCScheduler is ready, can read the credentials from pureconfig
+ val schedulerUsername = "admin"
+ val schedulerPassword = "admin"
+
+ val queuePathPrefix = "queue"
def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
actorSystem: ActorSystem,
logger: Logging): BasicRasService =
- new SchedulerServer(scheduler, schedulerUsername, schedulerPassword)
+ new FPCSchedulerServer(scheduler, schedulerUsername, schedulerPassword)
}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 9fc793b..6bb4311 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -81,21 +81,20 @@
val durationChecker = "" // TODO: TBD
override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
- Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+ Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it
}
override def getQueueSize: Future[Int] = {
- Future.successful(0) // TODO: TBD
+ Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it
}
- override def getQueueStatusData: Future[List[String]] = {
- Future.successful(List("")) // TODO: TBD
+ override def getQueueStatusData: Future[List[StatusData]] = {
+ Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it
}
- // other components don't need to shutdown gracefully
override def disable(): Unit = {
logging.info(this, s"Gracefully shutting down the scheduler")
- // TODO: TBD, gracefully shut down the container manager and queue manager
+ // TODO: TBD, after containerManager and queueManager are ready, can implement it
}
private def getUserLimit(invocationNamespace: String): Future[Int] = {
@@ -160,7 +159,7 @@
def getQueueSize: Future[Int]
- def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string
+ def getQueueStatusData: Future[List[StatusData]]
def disable(): Unit
}
@@ -253,7 +252,7 @@
val httpsConfig =
if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
- BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route, port, httpsConfig)(
+ BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(
actorSystem,
ActorMaterializer.create(actorSystem))
diff --git a/tests/build.gradle b/tests/build.gradle
index 57c3bd7..49fac12 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -325,6 +325,7 @@
doLast{
Project common = project(":common:scala")
Project controller = project(":core:controller")
+ Project scheduler = project(":core:scheduler")
Project invoker = project(":core:invoker")
Properties wskProps = loadWhiskProps()
@@ -335,6 +336,8 @@
copyAndRenameMeasurementFile(covLogs, 'controller', "common", common)
copyAndRenameMeasurementFile(covLogs, 'controller', "controller", controller)
+ copyAndRenameMeasurementFile(covLogs, 'scheduler', "common", common)
+ copyAndRenameMeasurementFile(covLogs, 'scheduler', "scheduler", scheduler)
copyAndRenameMeasurementFile(covLogs, 'invoker', "common", common)
copyAndRenameMeasurementFile(covLogs, 'invoker', "invoker", invoker)
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
new file mode 100644
index 0000000..0dab4f4
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.StatusCodes._
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.StatusData
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+
+import scala.concurrent.Future
+
+/**
+ * Tests SchedulerServer API.
+ */
+@RunWith(classOf[JUnitRunner])
+class FPCSchedulerServerTests
+ extends FlatSpec
+ with BeforeAndAfterEach
+ with BeforeAndAfterAll
+ with ScalatestRouteTest
+ with Matchers
+ with StreamLogging
+ with MockFactory {
+
+ def transid() = TransactionId("tid")
+
+ val systemUsername = "username"
+ val systemPassword = "password"
+
+ val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 3))
+ val creationCount = 1
+ val testQueueSize = 2
+ val statusDatas = List(
+ StatusData("testns1", "testaction1", 10, "Running", "RunningData"),
+ StatusData("testns2", "testaction2", 5, "Running", "RunningData"))
+
+ // Create scheduler
+ val scheduler = new TestScheduler(queues, creationCount, testQueueSize, statusDatas)
+ val server = new FPCSchedulerServer(scheduler, systemUsername, systemPassword)
+
+ override protected def afterEach(): Unit = scheduler.reset()
+
+ /** FPCSchedulerServer API tests */
+ behavior of "FPCSchedulerServer API"
+
+ // POST /disable
+ it should "disable scheduler" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ scheduler.shutdownCount shouldBe 1
+ }
+ }
+
+ // GET /state
+ it should "get scheduler state" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/state") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> s._2.toString).toMap ++ Map(
+ "creationCount" -> creationCount.toString)).toJson
+ }
+ }
+
+ // GET /queue/total
+ it should "get total queue" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/queue/total") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ responseAs[String] shouldBe testQueueSize.toString
+ }
+ }
+
+ // GET /queue/status
+ it should "get all queue status" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/queue/status") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson)
+ }
+ }
+
+ // POST /disable with invalid credential
+ it should "not call scheduler api with invalid credential" in {
+ implicit val tid = transid()
+ val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+ Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ scheduler.shutdownCount shouldBe 0
+ }
+ }
+
+ // POST /disable with empty credential
+ it should "not call scheduler api with empty credential" in {
+ implicit val tid = transid()
+ Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ scheduler.shutdownCount shouldBe 0
+ }
+ }
+
+}
+
+class TestScheduler(schedulerStates: List[(SchedulerInstanceId, Int)],
+ creationCount: Int,
+ queueSize: Int,
+ statusDatas: List[StatusData])
+ extends SchedulerCore {
+ var shutdownCount = 0
+
+ override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] =
+ Future.successful(schedulerStates, creationCount)
+
+ override def getQueueSize: Future[Int] = Future.successful(queueSize)
+
+ override def getQueueStatusData: Future[List[StatusData]] = Future.successful(statusDatas)
+
+ override def disable(): Unit = shutdownCount += 1
+
+ def reset(): Unit = {
+ shutdownCount = 0
+ }
+}