blob: b0e2101da83fdcca7715c8d0a555bb2b870df97a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.services
import scala.concurrent.duration._
import scala.util.{Success, Try}
import akka.actor.ActorRef
import akka.http.scaladsl.model.headers.`Cache-Control`
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
class WorkerServiceSpec
extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
override def testConfig: Config = TestUtil.DEFAULT_CONFIG
protected def actorRefFactory = system
val mockWorker = TestProbe()
protected def master = mockMaster.ref
protected def workerRoute = new WorkerService(master, system).route
mockWorker.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case GetWorkerData(workerId) =>
sender ! WorkerData(WorkerSummary.empty)
KeepRunning
case QueryWorkerConfig(workerId) =>
sender ! WorkerConfig(null)
KeepRunning
case QueryHistoryMetrics(path, _, _, _) =>
sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
KeepRunning
}
}
}
val mockMaster = TestProbe()
mockMaster.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case ResolveWorkerId(workerId) =>
sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
KeepRunning
}
}
}
"ConfigQueryService" should "return config for worker" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config")
~> workerRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
}
}
it should "return WorkerData" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}")
~> workerRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
// Check the header, should contains no-cache header.
// Cache-Control:no-cache, max-age=0
val noCache = header[`Cache-Control`].get.value()
assert(noCache == "no-cache, max-age=0")
}
}
"MetricsQueryService" should "return history metrics" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker")
~> workerRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
}
}
override def afterAll {
TestKit.shutdownActorSystem(system)
}
}