blob: be4cc635c46ea836b3fcdfd97a4aa13544dd94d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.dsl.plan
import java.time.Instant
import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner}
import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FoldRunner}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
implicit var system: ActorSystem = _
override def beforeAll(): Unit = {
system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
}
override def afterAll(): Unit = {
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
}
"Planner" should "chain operations" in {
val graph = Graph.empty[Op, OpEdge]
val sourceOp = DataSourceOp(new AnySource)
val groupBy = (any: Any) => any
val groupByOp = GroupByOp(groupBy)
val windowOp = WindowOp(GlobalWindows())
val flatMapOp = TransformOp[Any, Any](anyFlatMapper)
val reduceOp = TransformOp[Any, Option[Any]](anyReducer)
val processorOp = new ProcessorOp[AnyTask]
val sinkOp = DataSinkOp(new AnySink)
val directEdge = Direct
val shuffleEdge = Shuffle
graph.addVertex(sourceOp)
graph.addVertex(groupByOp)
graph.addEdge(sourceOp, shuffleEdge, groupByOp)
graph.addVertex(windowOp)
graph.addEdge(groupByOp, directEdge, windowOp)
graph.addVertex(flatMapOp)
graph.addEdge(windowOp, directEdge, flatMapOp)
graph.addVertex(reduceOp)
graph.addEdge(flatMapOp, directEdge, reduceOp)
graph.addVertex(processorOp)
graph.addEdge(reduceOp, directEdge, processorOp)
graph.addVertex(sinkOp)
graph.addEdge(processorOp, directEdge, sinkOp)
implicit val system = MockUtil.system
val planner = new Planner
val plan = planner.plan(graph)
.mapVertex(_.description)
plan.getVertices.toSet should contain theSameElementsAs
Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
a[GroupByPartitioner[_, _]]
plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe
a[CoLocationPartitioner]
plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner]
}
}
object PlannerSpec {
private val anyFlatMapper = new FlatMapper[Any, Any](
FlatMapFunction(Option(_)), "flatMap")
private val anyReducer = new FoldRunner[Any, Option[Any]](
ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
class AnySource extends DataSource {
override def open(context: TaskContext, startTime: Instant): Unit = {}
override def read(): Message = Message("any")
override def close(): Unit = {}
override def getWatermark: Instant = Instant.now()
}
class AnySink extends DataSink {
override def open(context: TaskContext): Unit = {}
override def write(message: Message): Unit = {}
override def close(): Unit = {}
}
}