| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gearpump.services |
| |
| import java.io.File |
| import scala.concurrent.duration._ |
| import scala.concurrent.{ExecutionContext, Future} |
| import scala.util.{Success, Try} |
| |
| import akka.actor.ActorRef |
| import akka.http.scaladsl.marshalling.Marshal |
| import akka.http.scaladsl.model._ |
| import akka.http.scaladsl.model.headers.`Cache-Control` |
| import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} |
| import akka.stream.scaladsl.{FileIO, Source} |
| import akka.testkit.TestActor.{AutoPilot, KeepRunning} |
| import akka.testkit.TestProbe |
| import com.typesafe.config.{Config, ConfigFactory} |
| import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} |
| |
| import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} |
| import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication} |
| import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList} |
| import org.apache.gearpump.cluster.MasterToClient._ |
| import org.apache.gearpump.cluster.TestUtil |
| import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} |
| import org.apache.gearpump.jarstore.JarStoreClient |
| import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} |
| // NOTE: This cannot be removed!!! |
| import org.apache.gearpump.services.util.UpickleUtil._ |
| import org.apache.gearpump.streaming.ProcessorDescription |
| import org.apache.gearpump.util.Graph |
| |
| class MasterServiceSpec extends FlatSpec with ScalatestRouteTest |
| with Matchers with BeforeAndAfterAll { |
| import upickle.default.{read, write} |
| |
| override def testConfig: Config = TestUtil.UI_CONFIG |
| |
| val workerId = 0 |
| val mockWorker = TestProbe() |
| |
| val jarStoreClient = new JarStoreClient(system.settings.config, system) |
| private def master = mockMaster.ref |
| |
| private def masterRoute = new MasterService(master, jarStoreClient, system).route |
| |
| mockWorker.setAutoPilot { |
| new AutoPilot { |
| def run(sender: ActorRef, msg: Any): AutoPilot = msg match { |
| case GetWorkerData(workerId) => |
| sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId)) |
| KeepRunning |
| case QueryHistoryMetrics(path, _, _, _) => |
| sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) |
| KeepRunning |
| } |
| } |
| } |
| |
| val mockMaster = TestProbe() |
| mockMaster.setAutoPilot { |
| new AutoPilot { |
| def run(sender: ActorRef, msg: Any): AutoPilot = msg match { |
| case GetMasterData => |
| sender ! MasterData(null) |
| KeepRunning |
| case AppMastersDataRequest => |
| sender ! AppMastersData(List.empty[AppMasterData]) |
| KeepRunning |
| case GetAllWorkers => |
| sender ! WorkerList(List(WorkerId(0, 0L))) |
| KeepRunning |
| case ResolveWorkerId(WorkerId(0, 0L)) => |
| sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) |
| KeepRunning |
| case QueryHistoryMetrics(path, _, _, _) => |
| sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) |
| KeepRunning |
| case QueryMasterConfig => |
| sender ! MasterConfig(null) |
| KeepRunning |
| case submit: SubmitApplication => |
| sender ! SubmitApplicationResult(Success(0)) |
| KeepRunning |
| } |
| } |
| } |
| |
| it should "return master info when asked" in { |
| implicit val customTimeout = RouteTestTimeout(15.seconds) |
| (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check { |
| // Checks the type |
| val content = responseAs[String] |
| read[MasterData](content) |
| |
| // Checks the header, should contains no-cache header. |
| // Cache-Control:no-cache, max-age=0 |
| val noCache = header[`Cache-Control`].get.value() |
| assert(noCache == "no-cache, max-age=0") |
| } |
| |
| mockMaster.expectMsg(GetMasterData) |
| } |
| |
| it should "return a json structure of appMastersData for GET request" in { |
| implicit val customTimeout = RouteTestTimeout(15.seconds) |
| (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check { |
| // Checks the type |
| read[AppMastersData](responseAs[String]) |
| } |
| mockMaster.expectMsg(AppMastersDataRequest) |
| } |
| |
| it should "return a json structure of worker data for GET request" in { |
| implicit val customTimeout = RouteTestTimeout(25.seconds) |
| Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check { |
| // Checks the type |
| val workerListJson = responseAs[String] |
| val workers = read[List[WorkerSummary]](workerListJson) |
| assert(workers.size > 0) |
| workers.foreach { worker => |
| worker.state shouldBe "active" |
| } |
| } |
| mockMaster.expectMsg(GetAllWorkers) |
| mockMaster.expectMsgType[ResolveWorkerId] |
| mockWorker.expectMsgType[GetWorkerData] |
| } |
| |
| it should "return config for master" in { |
| implicit val customTimeout = RouteTestTimeout(15.seconds) |
| (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check { |
| val responseBody = responseAs[String] |
| val config = Try(ConfigFactory.parseString(responseBody)) |
| assert(config.isSuccess) |
| } |
| mockMaster.expectMsg(QueryMasterConfig) |
| } |
| |
| "submit invalid application" should "return an error" in { |
| implicit val routeTestTimeout = RouteTestTimeout(30.second) |
| val tempfile = new File("foo") |
| val request = entity(tempfile) |
| |
| Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check { |
| assert(response.status.intValue == 500) |
| } |
| } |
| |
| private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { |
| val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), |
| FileIO.fromPath(file.toPath, chunkSize = 100000)) |
| |
| val body = Source.single( |
| Multipart.FormData.BodyPart( |
| "file", |
| entity, |
| Map("filename" -> file.getName))) |
| val form = Multipart.FormData(body) |
| |
| Marshal(form).to[RequestEntity] |
| } |
| |
| "MetricsQueryService" should "return history metrics" in { |
| implicit val customTimeout = RouteTestTimeout(15.seconds) |
| (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check { |
| val responseBody = responseAs[String] |
| val config = Try(ConfigFactory.parseString(responseBody)) |
| assert(config.isSuccess) |
| } |
| } |
| |
| "submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in { |
| import org.apache.gearpump.util.Graph._ |
| val processors = Map( |
| 0 -> ProcessorDescription(0, "A", parallelism = 1), |
| 1 -> ProcessorDescription(1, "B", parallelism = 1) |
| ) |
| val dag = Graph(0 ~ "partitioner" ~> 1) |
| val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null)) |
| Post(s"/api/$REST_VERSION/master/submitdag", |
| HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check { |
| val responseBody = responseAs[String] |
| val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody) |
| assert(submitApplicationResultValue.appId >= 0, "invalid appid") |
| } |
| } |
| |
| "MasterService" should "return Gearpump built-in partitioner list" in { |
| (Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check { |
| val responseBody = responseAs[String] |
| val partitioners = read[BuiltinPartitioners](responseBody) |
| assert(partitioners.partitioners.length > 0, "invalid response") |
| } |
| } |
| } |