blob: 3f23fa98831107f6e6ed243501f9b2e69a9008f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.dsl.plan
import java.time.Instant
import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.{MockUtil, Processor}
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
implicit var system: ActorSystem = _
override def beforeAll(): Unit = {
system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
}
override def afterAll(): Unit = {
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
}
"Planner" should "chain operations" in {
val graph = Graph.empty[Op, OpEdge]
val sourceOp = DataSourceOp(new AnySource)
val groupByOp = GroupByOp(new AnyGroupByFn)
val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
val reduceOp = ChainableOp[Any, Any](anyReducer)
val processorOp = new ProcessorOp[AnyTask]
val sinkOp = DataSinkOp(new AnySink)
val directEdge = Direct
val shuffleEdge = Shuffle
graph.addVertex(sourceOp)
graph.addVertex(groupByOp)
graph.addEdge(sourceOp, shuffleEdge, groupByOp)
graph.addVertex(flatMapOp)
graph.addEdge(groupByOp, directEdge, flatMapOp)
graph.addVertex(reduceOp)
graph.addEdge(flatMapOp, directEdge, reduceOp)
graph.addVertex(processorOp)
graph.addEdge(reduceOp, directEdge, processorOp)
graph.addVertex(sinkOp)
graph.addEdge(processorOp, directEdge, sinkOp)
implicit val system = MockUtil.system
val planner = new Planner
val plan = planner.plan(graph)
.mapVertex(_.description)
plan.vertices.toSet should contain theSameElementsAs
Set("source", "groupBy", "processor", "sink")
plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]]
plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner]
plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner]
}
}
object PlannerSpec {
private val anyParallelism = 1
private val anyFlatMapper = new FlatMapper[Any, Any](
FlatMapFunction(Option(_)), "flatMap")
private val anyReducer = new Reducer[Any](
ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
class AnySource extends DataSource {
override def open(context: TaskContext, startTime: Instant): Unit = {}
override def read(): Message = Message("any")
override def close(): Unit = {}
override def getWatermark: Instant = Instant.now()
}
class AnySink extends DataSink {
override def open(context: TaskContext): Unit = {}
override def write(message: Message): Unit = {}
override def close(): Unit = {}
}
class AnyGroupByFn extends GroupByFn[Any, Any] {
override def groupBy(message: Message): Any = message.msg
override def getProcessor(
parallelism: Int,
description: String,
userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
Processor[AnyTask](anyParallelism, description)
}
}
}