blob: def9d443703e011f641ce9c5566f626de48a9ac8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.appmaster
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.cluster.{AppJar, TestUtil}
import org.apache.gearpump.jarstore.FilePath
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
import org.apache.gearpump.streaming.task.TaskId
import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _}
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
import org.scalatest.{Matchers, WordSpec}
import scala.concurrent.{Await, Future}
class JarSchedulerSpec extends WordSpec with Matchers {
val mockJar1 = AppJar("jar1", FilePath("path"))
val mockJar2 = AppJar("jar2", FilePath("path"))
val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1,
jar = mockJar1)
val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1,
jar = mockJar1)
val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2,
jar = mockJar2)
val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
import scala.concurrent.duration._
"JarScheduler" should {
"schedule tasks depends on app jar" in {
val system = ActorSystem("JarSchedulerSpec")
implicit val dispatcher = system.dispatcher
val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
manager.setDag(dag, Future {
0L
})
val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified))
val result = Await.result(manager.getResourceRequestDetails(), 15.seconds)
assert(result.length == 1)
assert(result.head.jar == mockJar1)
assert(result.head.requests.deep == requests.deep)
val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0,
Resource(2)), 15.seconds)
assert(tasks.contains(TaskId(0, 0)))
assert(tasks.contains(TaskId(1, 0)))
val newDag = replaceDAG(dag, 1, task3, 1)
manager.setDag(newDag, Future {
0
})
val requestDetails = Await.result(manager.getResourceRequestDetails().
map(_.sortBy(_.jar.name)), 15.seconds)
assert(requestDetails.length == 2)
assert(requestDetails.last.jar == mockJar2)
assert(requestDetails.last.requests.deep == requests.deep)
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
}
}
def replaceDAG(
dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
: DAG = {
val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
newProcessor.life.birth)
val newProcessorMap = dag.processors ++
Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
newProcessor.id -> newProcessor)
val newGraph = dag.graph.subGraph(oldProcessorId).
replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
new DAG(newVersion, newProcessorMap, newGraph)
}
}