blob: 94feae4d6ae6c3b817290f5a73789e28b894e379 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.dsl.plan.functions
import java.time.Instant
import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.CollectionDataSource
import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
import org.apache.gearpump.streaming.dsl.window.api.CountWindow
import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
import org.mockito.ArgumentCaptor
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{Matchers, WordSpec}
import org.scalatest.mock.MockitoSugar
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._
"AndThen" should {
val first = mock[SingleInputFunction[R, S]]
val second = mock[SingleInputFunction[S, T]]
val andThen = new AndThen(first, second)
"chain first and second functions when processing input value" in {
val input = mock[R]
val firstOutput = mock[S]
val secondOutput = mock[T]
when(first.process(input)).thenReturn(Some(firstOutput))
when(second.process(firstOutput)).thenReturn(Some(secondOutput))
andThen.process(input).toList shouldBe List(secondOutput)
}
"return chained description" in {
when(first.description).thenReturn("first")
when(second.description).thenReturn("second")
andThen.description shouldBe "first.second"
}
"return either first result or second on finish" in {
val firstResult = mock[S]
val processedFirst = mock[T]
val secondResult = mock[T]
when(first.finish()).thenReturn(Some(firstResult))
when(second.process(firstResult)).thenReturn(Some(processedFirst))
andThen.finish().toList shouldBe List(processedFirst)
when(first.finish()).thenReturn(None)
when(second.finish()).thenReturn(Some(secondResult))
andThen.finish().toList shouldBe List(secondResult)
}
"clear both states on clearState" in {
andThen.clearState()
verify(first).clearState()
verify(second).clearState()
}
"return AndThen on andThen" in {
val third = mock[SingleInputFunction[T, Any]]
andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]]
}
}
"FlatMapFunction" should {
val flatMap = mock[R => TraversableOnce[S]]
val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
"call flatMap function when processing input value" in {
val input = mock[R]
flatMapFunction.process(input)
verify(flatMap).apply(input)
}
"return passed in description" in {
flatMapFunction.description shouldBe "flatMap"
}
"return None on finish" in {
flatMapFunction.finish() shouldBe List.empty[S]
}
"do nothing on clearState" in {
flatMapFunction.clearState()
verifyZeroInteractions(flatMap)
}
"return AndThen on andThen" in {
val other = mock[SingleInputFunction[S, T]]
flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
}
}
"ReduceFunction" should {
"call reduce function when processing input value" in {
val reduce = mock[(T, T) => T]
val reduceFunction = new ReduceFunction[T](reduce, "reduce")
val input1 = mock[T]
val input2 = mock[T]
val output = mock[T]
when(reduce.apply(input1, input2)).thenReturn(output, output)
reduceFunction.process(input1) shouldBe List.empty[T]
reduceFunction.process(input2) shouldBe List.empty[T]
reduceFunction.finish() shouldBe List(output)
reduceFunction.clearState()
reduceFunction.process(input1) shouldBe List.empty[T]
reduceFunction.clearState()
reduceFunction.process(input2) shouldBe List.empty[T]
reduceFunction.finish() shouldBe List(input2)
}
"return passed in description" in {
val reduce = mock[(T, T) => T]
val reduceFunction = new ReduceFunction[T](reduce, "reduce")
reduceFunction.description shouldBe "reduce"
}
"return None on finish" in {
val reduce = mock[(T, T) => T]
val reduceFunction = new ReduceFunction[T](reduce, "reduce")
reduceFunction.finish() shouldBe List.empty[T]
}
"do nothing on clearState" in {
val reduce = mock[(T, T) => T]
val reduceFunction = new ReduceFunction[T](reduce, "reduce")
reduceFunction.clearState()
verifyZeroInteractions(reduce)
}
"return AndThen on andThen" in {
val reduce = mock[(T, T) => T]
val reduceFunction = new ReduceFunction[T](reduce, "reduce")
val other = mock[SingleInputFunction[T, Any]]
reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
}
}
"EmitFunction" should {
val emit = mock[T => Unit]
val emitFunction = new EmitFunction[T](emit)
"emit input value when processing input value" in {
val input = mock[T]
emitFunction.process(input) shouldBe List.empty[Unit]
verify(emit).apply(input)
}
"return empty description" in {
emitFunction.description shouldBe ""
}
"return None on finish" in {
emitFunction.finish() shouldBe List.empty[Unit]
}
"do nothing on clearState" in {
emitFunction.clearState()
verifyZeroInteractions(emit)
}
"throw exception on andThen" in {
val other = mock[SingleInputFunction[Unit, Any]]
intercept[UnsupportedOperationException] {
emitFunction.andThen(other)
}
}
}
"andThen" should {
"chain multiple single input function" in {
val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
val filter = new FlatMapFunction[String, String](word =>
if (word.isEmpty) None else Some(word), "filter")
val map = new FlatMapFunction[String, Int](word => Some(1), "map")
val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
val all = split.andThen(filter).andThen(map).andThen(sum)
assert(all.description == "split.filter.map.sum")
val data =
"""
five four three two one
five four three two
five four three
five four
five
"""
// force eager evaluation
all.process(data).toList
val result = all.finish().toList
assert(result.nonEmpty)
assert(result.last == 15)
}
}
"Source" should {
"iterate over input source and apply attached operator" in {
val taskContext = MockUtil.mockTaskContext
implicit val actorSystem = MockUtil.system
val data = "one two three".split("\\s")
val dataSource = new CollectionDataSource[String](data)
val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
// Source with no transformer
val source = new DataSourceTask[String, String](
taskContext, conf)
source.onStart(Instant.EPOCH)
source.onNext(Message("next"))
data.foreach { s =>
verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
message => message.msg == s))
}
// Source with transformer
val anotherTaskContext = MockUtil.mockTaskContext
val double = new FlatMapFunction[String, String](word => List(word, word), "double")
val another = new DataSourceTask(anotherTaskContext,
conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
another.onStart(Instant.EPOCH)
another.onNext(Message("next"))
data.foreach { s =>
verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
message => message.msg == s))
}
}
}
"CountTriggerTask" should {
"group input by groupBy Function and " +
"apply attached operator for each group" in {
val data = "1 2 2 3 3 3"
val concat = new ReduceFunction[String]({ (left, right) =>
left + right
}, "concat")
implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](
GEARPUMP_STREAMING_OPERATOR, concat)
val taskContext = MockUtil.mockTaskContext
val groupBy = GroupAlsoByWindow((input: String) => input, CountWindow.apply(1).accumulating)
val task = new CountTriggerTask[String, String](groupBy, taskContext, config)
task.onStart(Instant.EPOCH)
val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
data.split("\\s+").foreach { word =>
task.onNext(Message(word))
}
verify(taskContext, times(6)).output(peopleCaptor.capture())
import scala.collection.JavaConverters._
val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
assert(values.mkString(",") == "1,2,22,3,33,333")
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
}
}
"MergeTask" should {
"accept two stream and apply the attached operator" in {
// Source with transformer
val taskContext = MockUtil.mockTaskContext
val conf = UserConfig.empty
val double = new FlatMapFunction[String, String](word => List(word, word), "double")
val task = new TransformTask[String, String](Some(double), taskContext, conf)
task.onStart(Instant.EPOCH)
val data = "1 2 2 3 3 3".split("\\s+")
data.foreach { input =>
task.onNext(Message(input))
}
verify(taskContext, times(data.length * 2)).output(anyObject())
}
}
}
object SingleInputFunctionSpec {
type R = AnyRef
type S = AnyRef
type T = AnyRef
}