| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.gearpump.streaming.task |
| |
| import akka.actor.{ExtendedActorSystem, Props} |
| import akka.testkit._ |
| import com.typesafe.config.{Config, ConfigFactory} |
| import org.mockito.Mockito.{mock, times, verify, when} |
| import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} |
| |
| import org.apache.gearpump.Message |
| import org.apache.gearpump.cluster.{MasterHarness, TestUtil, UserConfig} |
| import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} |
| import org.apache.gearpump.serializer.{FastKryoSerializer, SerializationFramework} |
| import org.apache.gearpump.streaming.AppMasterToExecutor.{ChangeTask, MsgLostException, StartTask, TaskChanged, TaskRegistered} |
| import org.apache.gearpump.streaming.task.TaskActorSpec.TestTask |
| import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription} |
| import org.apache.gearpump.util.Graph._ |
| import org.apache.gearpump.util.{Graph, Util} |
| |
| class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { |
| protected override def config: Config = { |
| ConfigFactory.parseString( |
| """ akka.loggers = ["akka.testkit.TestEventListener"] |
| | akka.test.filter-leeway = 20000 |
| """.stripMargin). |
| withFallback(TestUtil.DEFAULT_CONFIG) |
| } |
| |
| val appId = 0 |
| val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask].getName, parallelism = 1) |
| val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask].getName, parallelism = 1) |
| val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) |
| val taskId1 = TaskId(0, 0) |
| val taskId2 = TaskId(1, 0) |
| val executorId1 = 1 |
| val executorId2 = 2 |
| |
| var mockMaster: TestProbe = null |
| var taskContext1: TaskContextData = null |
| |
| var mockSerializerPool: SerializationFramework = null |
| |
| override def beforeEach(): Unit = { |
| startActorSystem() |
| mockMaster = TestProbe()(getActorSystem) |
| |
| mockSerializerPool = mock(classOf[SerializationFramework]) |
| val serializer = new FastKryoSerializer(getActorSystem.asInstanceOf[ExtendedActorSystem]) |
| when(mockSerializerPool.get()).thenReturn(serializer) |
| |
| taskContext1 = TaskContextData(executorId1, appId, |
| "appName", mockMaster.ref, 1, |
| LifeTime.Immortal, |
| Subscriber.of(processorId = 0, dag)) |
| } |
| |
| "TaskActor" should { |
| "register itself to AppMaster when started" in { |
| val mockTask = mock(classOf[TaskWrapper]) |
| val testActor = TestActorRef[TaskActor](Props( |
| new TaskActor(taskId1, taskContext1, UserConfig.empty, |
| mockTask, mockSerializerPool)))(getActorSystem) |
| testActor ! TaskRegistered(taskId1, 0, Util.randInt()) |
| testActor ! StartTask(taskId1) |
| |
| implicit val system = getActorSystem |
| val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId) |
| EventFilter[MsgLostException](occurrences = 1) intercept { |
| testActor ! ack |
| } |
| } |
| |
| "respond to ChangeTask" in { |
| val mockTask = mock(classOf[TaskWrapper]) |
| val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, |
| UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) |
| testActor ! TaskRegistered(taskId1, 0, Util.randInt()) |
| testActor ! StartTask(taskId1) |
| mockMaster.expectMsgType[GetUpstreamMinClock] |
| |
| mockMaster.send(testActor, ChangeTask(taskId1, 1, LifeTime.Immortal, List.empty[Subscriber])) |
| mockMaster.expectMsgType[TaskChanged] |
| } |
| |
| "handle received message correctly" in { |
| val mockTask = mock(classOf[TaskWrapper]) |
| val msg = Message("test") |
| |
| val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, |
| UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) |
| testActor.tell(TaskRegistered(taskId1, 0, Util.randInt()), mockMaster.ref) |
| testActor.tell(StartTask(taskId1), mockMaster.ref) |
| |
| testActor.tell(msg, testActor) |
| |
| verify(mockTask, times(1)).onNext(msg) |
| } |
| } |
| |
| override def afterEach(): Unit = { |
| shutdownActorSystem() |
| } |
| } |
| |
| object TaskActorSpec { |
| class TestTask |
| } |