blob: 030d4bbc9aa00e9d5e95a9535a731ee0622516c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.services
import akka.actor.ActorRef
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.testkit.TestActor.{KeepRunning, AutoPilot}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.{ConfigFactory, Config}
import org.apache.gearpump.cluster.AppMasterToMaster.{WorkerData, GetWorkerData}
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.MasterToClient.ResolveWorkerIdResult
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
import scala.concurrent.duration._
import scala.util.Success
class SupervisorServiceSpec
extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
override def testConfig: Config = TestUtil.DEFAULT_CONFIG
protected def actorRefFactory = system
private val mockSupervisor = TestProbe()
private val supervisor = mockSupervisor.ref
private val mockMaster = TestProbe()
protected def master = mockMaster.ref
private val mockWorker = TestProbe()
protected def supervisorRoute = new SupervisorService(master, supervisor, system).route
protected def nullRoute = new SupervisorService(master, null, system).route
mockSupervisor.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case AddWorker(workerCount) =>
sender ! CommandResult(success = true)
KeepRunning
case RemoveWorker(workerId) =>
sender ! CommandResult(success = true)
KeepRunning
}
}
}
mockWorker.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case GetWorkerData(workerId) =>
sender ! WorkerData(WorkerSummary.empty)
KeepRunning
}
}
}
mockMaster.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case ResolveWorkerId(workerId) =>
sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
KeepRunning
}
}
}
"SupervisorService" should "get supervisor path" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/supervisor") ~> supervisorRoute) ~> check {
val responseBody = responseAs[String]
ConfigFactory.parseString(responseBody).getString("path") shouldBe supervisor.path.toString
}
(Get(s"/api/$REST_VERSION/supervisor") ~> nullRoute) ~> check {
val responseBody = responseAs[String]
ConfigFactory.parseString(responseBody).getIsNull("path") shouldBe true
}
}
"SupervisorService" should "write status" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Post(s"/api/$REST_VERSION/supervisor/status")
~> supervisorRoute) ~> check {
val responseBody = responseAs[String]
ConfigFactory.parseString(responseBody).getBoolean("enabled") shouldBe true
}
(Post(s"/api/$REST_VERSION/supervisor/status") ~> nullRoute) ~> check {
val responseBody = responseAs[String]
ConfigFactory.parseString(responseBody).getBoolean("enabled") shouldBe false
}
}
"SupervisorService" should "add worker" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Post(s"/api/$REST_VERSION/supervisor/addworker/1")
~> supervisorRoute) ~> check {
val responseBody = responseAs[String]
ConfigFactory.parseString(responseBody).getBoolean("success") shouldBe true
}
(Post(s"/api/$REST_VERSION/supervisor/addworker/1")
~> nullRoute) ~> check {
status shouldBe StatusCodes.InternalServerError
}
}
"SupervisorService" should "remove worker" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Post(s"/api/$REST_VERSION/supervisor/removeworker/${WorkerId.render(WorkerId(1, 0L))}")
~> supervisorRoute) ~> check {
val responseBody = responseAs[String]
ConfigFactory.parseString(responseBody).getBoolean("success") shouldBe true
}
(Post(s"/api/$REST_VERSION/supervisor/removeworker/${WorkerId.render(WorkerId(1, 0L))}")
~> nullRoute) ~> check {
status shouldBe StatusCodes.InternalServerError
}
}
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
}