blob: 4f2b642884d8b488e629c11afb3db41426aa5dcd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.services
import scala.concurrent.duration._
import scala.util.{Success, Try}
import akka.actor.ActorRef
import akka.http.scaladsl.model.headers.`Cache-Control`
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import upickle.default.read
import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.{ApplicationStatus, TestUtil}
import org.apache.gearpump.jarstore.JarStoreClient
import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
with Matchers with BeforeAndAfterAll {
override def testConfig: Config = TestUtil.UI_CONFIG
val mockAppMaster = TestProbe()
val failure = LastFailure(System.currentTimeMillis(), "Some error")
val jarStoreClient = new JarStoreClient(system.settings.config, system)
private def master = mockMaster.ref
private def appMasterRoute = new AppMasterService(master, jarStoreClient, system).route
mockAppMaster.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case AppMasterDataDetailRequest(appId) =>
sender ! GeneralAppMasterSummary(appId)
KeepRunning
case QueryHistoryMetrics(path, _, _, _) =>
sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
KeepRunning
case GetLastFailure(appId) =>
sender ! failure
KeepRunning
case GetExecutorSummary(0) =>
sender ! ExecutorSummary.empty
KeepRunning
case QueryExecutorConfig(0) =>
sender ! ExecutorConfig(system.settings.config)
KeepRunning
}
}
}
val mockMaster = TestProbe()
mockMaster.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case ResolveAppId(0) =>
sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
KeepRunning
case AppMasterDataRequest(appId, _) =>
sender ! AppMasterData(ApplicationStatus.ACTIVE)
KeepRunning
case QueryAppMasterConfig(appId) =>
sender ! AppMasterConfig(null)
KeepRunning
}
}
}
"AppMasterService" should "return a JSON structure for GET request when detail = false" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check {
val responseBody = responseAs[String]
read[AppMasterData](responseBody)
// Checks the header, should contains no-cache header.
// Cache-Control:no-cache, max-age=0
val noCache = header[`Cache-Control`].get.value()
assert(noCache == "no-cache, max-age=0")
}
Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check {
val responseBody = responseAs[String]
}
}
"MetricsQueryService" should "return history metrics" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/appmaster/0/metrics/processor") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
}
}
"AppMaster" should "return lastest error" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/appmaster/0/errors") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
assert(read[LastFailure](responseBody) == failure)
}
}
"ConfigQueryService" should "return config for application" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
}
}
it should "return config for executor " in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
}
}
it should "return return executor summary" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
val executorSummary = read[ExecutorSummary](responseBody)
assert(executorSummary.id == 0)
}
}
override def afterAll {
TestKit.shutdownActorSystem(system)
}
}