blob: e0233f822540ce1447a6be129c361b5e8feb6370 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.worker
import akka.actor.{ActorSystem, PoisonPill, Props}
import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
import org.apache.gearpump.cluster.master.Master.MasterInfo
import org.apache.gearpump.cluster.scheduler.Resource
import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
import org.scalatest._
import scala.concurrent.Await
import scala.concurrent.duration._
class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
override def config: Config = TestUtil.DEFAULT_CONFIG
val appId = 1
val workerId: WorkerId = WorkerId(1, 0L)
val executorId = 1
var masterProxy: TestProbe = null
var mockMaster: TestProbe = null
var client: TestProbe = null
val workerSlots = 50
override def beforeEach(): Unit = {
startActorSystem()
mockMaster = TestProbe()(getActorSystem)
masterProxy = TestProbe()(getActorSystem)
client = TestProbe()(getActorSystem)
}
override def afterEach(): Unit = {
shutdownActorSystem()
}
"The new started worker" should {
"kill itself if no response from Master after registering" in {
val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
mockMaster watch worker
mockMaster.expectMsg(RegisterNewWorker)
mockMaster.expectTerminated(worker, 60.seconds)
}
}
"Worker" should {
"init its resource from the gearpump config" in {
val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
withFallback(TestUtil.DEFAULT_CONFIG)
val workerSystem = ActorSystem("WorkerSystem", config)
val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
mockMaster watch worker
mockMaster.expectMsg(RegisterNewWorker)
worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
worker.tell(
UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
mockMaster.expectTerminated(worker, 5.seconds)
workerSystem.terminate()
Await.result(workerSystem.whenTerminated, Duration.Inf)
}
}
"Worker" should {
"update its remaining resource when launching and shutting down executors" in {
val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
masterProxy.expectMsg(RegisterNewWorker)
worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
// This is an actor path which the ActorSystemBooter will report back to,
// not needed in this test
val reportBack = "dummy"
val executionContext = ExecutorJVMConfig(Array.empty[String],
getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
username = "user")
// Test LaunchExecutor
worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
mockMaster.ref)
mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
// Test terminationWatch
worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
client.expectMsg(ShutdownExecutorSucceed(1, 1))
worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
client.expectMsg(ShutdownExecutorFailed(
s"Can not find executor ${executorId + 1} for app $appId"))
mockMaster.ref ! PoisonPill
masterProxy.expectMsg(RegisterWorker(workerId))
}
}
}