blob: 07e44c1a6fe0eb19f1ce54f5020a6879cba0a8fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.services
import java.io.File
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Success, Try}
import akka.actor.ActorRef
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.`Cache-Control`
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.stream.scaladsl.{FileIO, Source}
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
import org.apache.gearpump.jarstore.JarStoreClient
import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
import org.apache.gearpump.streaming.ProcessorDescription
import org.apache.gearpump.util.Graph
class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
with Matchers with BeforeAndAfterAll {
import upickle.default.{read, write}
override def testConfig: Config = TestUtil.UI_CONFIG
val workerId = 0
val mockWorker = TestProbe()
val jarStoreClient = new JarStoreClient(system.settings.config, system)
private def master = mockMaster.ref
private def masterRoute = new MasterService(master, jarStoreClient, system).route
mockWorker.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case GetWorkerData(workerId) =>
sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId))
KeepRunning
case QueryHistoryMetrics(path, _, _, _) =>
sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
KeepRunning
}
}
}
val mockMaster = TestProbe()
mockMaster.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case GetMasterData =>
sender ! MasterData(null)
KeepRunning
case AppMastersDataRequest =>
sender ! AppMastersData(List.empty[AppMasterData])
KeepRunning
case GetAllWorkers =>
sender ! WorkerList(List(WorkerId(0, 0L)))
KeepRunning
case ResolveWorkerId(WorkerId(0, 0L)) =>
sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
KeepRunning
case QueryHistoryMetrics(path, _, _, _) =>
sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
KeepRunning
case QueryMasterConfig =>
sender ! MasterConfig(null)
KeepRunning
case submit: SubmitApplication =>
sender ! SubmitApplicationResult(Success(0))
KeepRunning
}
}
}
it should "return master info when asked" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check {
// Checks the type
val content = responseAs[String]
read[MasterData](content)
// Checks the header, should contains no-cache header.
// Cache-Control:no-cache, max-age=0
val noCache = header[`Cache-Control`].get.value()
assert(noCache == "no-cache, max-age=0")
}
mockMaster.expectMsg(GetMasterData)
}
it should "return a json structure of appMastersData for GET request" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check {
// Checks the type
read[AppMastersData](responseAs[String])
}
mockMaster.expectMsg(AppMastersDataRequest)
}
it should "return a json structure of worker data for GET request" in {
implicit val customTimeout = RouteTestTimeout(25.seconds)
Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check {
// Checks the type
val workerListJson = responseAs[String]
val workers = read[List[WorkerSummary]](workerListJson)
assert(workers.size > 0)
workers.foreach { worker =>
worker.state shouldBe "active"
}
}
mockMaster.expectMsg(GetAllWorkers)
mockMaster.expectMsgType[ResolveWorkerId]
mockWorker.expectMsgType[GetWorkerData]
}
it should "return config for master" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
}
mockMaster.expectMsg(QueryMasterConfig)
}
"submit invalid application" should "return an error" in {
implicit val routeTestTimeout = RouteTestTimeout(30.second)
val tempfile = new File("foo")
val request = entity(tempfile)
Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check {
assert(response.status.intValue == 500)
}
}
private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
FileIO.fromPath(file.toPath, chunkSize = 100000))
val body = Source.single(
Multipart.FormData.BodyPart(
"file",
entity,
Map("filename" -> file.getName)))
val form = Multipart.FormData(body)
Marshal(form).to[RequestEntity]
}
"MetricsQueryService" should "return history metrics" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
}
}
"submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in {
import org.apache.gearpump.util.Graph._
val processors = Map(
0 -> ProcessorDescription(0, "A", parallelism = 1),
1 -> ProcessorDescription(1, "B", parallelism = 1)
)
val dag = Graph(0 ~ "partitioner" ~> 1)
val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null))
Post(s"/api/$REST_VERSION/master/submitdag",
HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
val responseBody = responseAs[String]
val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody)
assert(submitApplicationResultValue.appId >= 0, "invalid appid")
}
}
"MasterService" should "return Gearpump built-in partitioner list" in {
(Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check {
val responseBody = responseAs[String]
val partitioners = read[BuiltinPartitioners](responseBody)
assert(partitioners.partitioners.length > 0, "invalid response")
}
}
}