blob: b8437327ff95fc0c0928597e22dd3127a79ed82a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.streamlet.scala.impl
import scala.util.Random
import org.junit.Assert.{assertEquals, assertTrue}
import org.apache.heron.resource.{
TestBasicBolt,
TestBolt,
TestWindowBolt
}
import org.apache.heron.streamlet.{
IStreamletBasicOperator,
IStreamletRichOperator,
IStreamletWindowOperator,
WindowConfig
}
import org.apache.heron.streamlet.impl.streamlets.{
ConsumerStreamlet,
CustomStreamlet,
FilterStreamlet,
FlatMapStreamlet,
LogStreamlet,
JoinStreamlet,
MapStreamlet,
ReduceByKeyAndWindowStreamlet,
RemapStreamlet,
TransformStreamlet,
SinkStreamlet,
UnionStreamlet
}
import org.apache.heron.streamlet.scala.{Builder, Streamlet}
import org.apache.heron.streamlet.scala.common.{
BaseFunSuite,
TestIncrementSerializableTransformer,
TestListBufferSink
}
/**
* Tests for Scala Streamlet Implementation functionality
*/
class StreamletImplTest extends BaseFunSuite {
val builder = Builder.newBuilder
test(
"StreamletImpl should support setting name and number of partitions per streamlet") {
val supplierStreamlet = builder
.newSource(() => Math.random)
.setName("Supplier_Streamlet_1")
.setNumPartitions(20)
assertTrue(supplierStreamlet.isInstanceOf[Streamlet[Double]])
assertEquals("Supplier_Streamlet_1", supplierStreamlet.getName)
assertEquals(20, supplierStreamlet.getNumPartitions)
val mapStreamlet = supplierStreamlet
.map[Double] { num: Double =>
num * 10
}
.setName("Map_Streamlet_1")
.setNumPartitions(5)
assertTrue(mapStreamlet.isInstanceOf[Streamlet[Double]])
assertEquals("Map_Streamlet_1", mapStreamlet.getName)
assertEquals(5, mapStreamlet.getNumPartitions)
}
test("StreamletImpl should support map transformation") {
val supplierStreamlet = builder
.newSource(() => Math.random)
.setName("Supplier_Streamlet_1")
.setNumPartitions(20)
supplierStreamlet
.map[String] { num: Double =>
(num * 10).toString
}
.setName("Map_Streamlet_1")
.setNumPartitions(5)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[MapStreamlet[_, _]])
val mapStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[MapStreamlet[Double, String]]
assertEquals("Map_Streamlet_1", mapStreamlet.getName)
assertEquals(0, mapStreamlet.getChildren.size())
}
test("StreamletImpl should support flatMap transformation") {
val supplierStreamlet = builder
.newSource(() => Math.random)
.setName("Supplier_Streamlet_1")
.setNumPartitions(20)
supplierStreamlet
.flatMap[String] { num: Double =>
List((num * 10).toString)
}
.setName("FlatMap_Streamlet_1")
.setNumPartitions(5)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[FlatMapStreamlet[_, _]])
val flatMapStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[FlatMapStreamlet[Double, String]]
assertEquals("FlatMap_Streamlet_1", flatMapStreamlet.getName)
assertEquals(0, flatMapStreamlet.getChildren.size())
}
test("StreamletImpl should support filter transformation") {
val supplierStreamlet = builder
.newSource(() => Math.random)
.setName("Supplier_Streamlet_1")
.setNumPartitions(20)
supplierStreamlet
.filter { num: Double =>
num > 10
}
.setName("Filter_Streamlet_1")
.setNumPartitions(5)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[FilterStreamlet[_]])
val filterStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[FilterStreamlet[Double]]
assertEquals("Filter_Streamlet_1", filterStreamlet.getName)
assertEquals(0, filterStreamlet.getChildren.size())
}
test("StreamletImpl should support repartition transformation") {
val supplierStreamlet = builder
.newSource(() => "aa bb cc dd ee")
.setName("Supplier_Streamlet_1")
.setNumPartitions(5)
supplierStreamlet
.repartition(10)
.setName("Repartitioned_Streamlet_1")
assertEquals(5, supplierStreamlet.getNumPartitions)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[String]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[MapStreamlet[_, _]])
val repartitionedStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[MapStreamlet[String, String]]
assertEquals("Repartitioned_Streamlet_1", repartitionedStreamlet.getName)
assertEquals(0, repartitionedStreamlet.getChildren.size())
assertEquals(10, repartitionedStreamlet.getNumPartitions)
}
test(
"StreamletImpl should support repartition transformation with partition function") {
val supplierStreamlet = builder
.newSource(() => "aa bb cc dd ee")
.setName("Supplier_Streamlet_1")
.setNumPartitions(5)
def partitionFunction(number1: String, number2: Int): Seq[Int] =
Seq(number1.toInt + number2)
supplierStreamlet
.repartition(10, partitionFunction)
.setName("Repartitioned_Streamlet_1")
assertEquals(5, supplierStreamlet.getNumPartitions)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[String]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[RemapStreamlet[_]])
val repartitionedStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[RemapStreamlet[String]]
assertEquals("Repartitioned_Streamlet_1", repartitionedStreamlet.getName)
assertEquals(0, repartitionedStreamlet.getChildren.size())
assertEquals(10, repartitionedStreamlet.getNumPartitions)
}
test("StreamletImpl should support union transformation") {
val supplierStreamlet = builder
.newSource(() => "aa bb cc dd ee")
.setName("Supplier_Streamlet_1")
.setNumPartitions(2)
val supplierStreamlet2 = builder
.newSource(() => "fff ggg hhh")
.setName("Supplier_Streamlet_2")
.setNumPartitions(3)
supplierStreamlet
.union(supplierStreamlet2)
.setName("Union_Streamlet_1")
.setNumPartitions(4)
verifySupplierStreamlet(supplierStreamlet)
verifySupplierStreamlet(supplierStreamlet2)
}
test("StreamletImpl should support consume function") {
val supplierStreamlet = builder
.newSource(() => Math.random)
.setName("Supplier_Streamlet_1")
.setNumPartitions(20)
supplierStreamlet
.consume { num: Double =>
num > 10
}
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[ConsumerStreamlet[_]])
val consumerStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[ConsumerStreamlet[Double]]
assertEquals(null, consumerStreamlet.getName)
assertEquals(0, consumerStreamlet.getChildren.size())
assertEquals(20, consumerStreamlet.getNumPartitions)
}
test("StreamletImpl should support log sink") {
val supplierStreamlet = builder
.newSource(() => Math.random)
.setName("Supplier_Streamlet_1")
.setNumPartitions(10)
supplierStreamlet
.log()
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[LogStreamlet[_]])
val consumerStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[LogStreamlet[Double]]
assertEquals(null, consumerStreamlet.getName)
assertEquals(0, consumerStreamlet.getChildren.size())
assertEquals(10, consumerStreamlet.getNumPartitions)
}
test("StreamletImpl should support custom sink") {
val supplierStreamlet = builder
.newSource(() => Random.nextInt(10))
.setName("Supplier_Streamlet_1")
.setNumPartitions(10)
supplierStreamlet
.toSink(new TestListBufferSink())
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[SinkStreamlet[_]])
val consumerStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[SinkStreamlet[Int]]
assertEquals(null, consumerStreamlet.getName)
assertEquals(0, consumerStreamlet.getChildren.size())
assertEquals(10, consumerStreamlet.getNumPartitions)
}
test("StreamletImpl should support join transformation") {
val numberStreamlet = builder
.newSource(() => Random.nextInt(10))
.setName("Supplier_Streamlet_with_Numbers")
.setNumPartitions(4)
val textStreamlet = builder
.newSource(() => Random.nextString(3))
.setName("Supplier_Streamlet_with_Strings")
.setNumPartitions(3)
numberStreamlet
.join[String, String, String](textStreamlet,
(x: Int) => x.toString,
(y: String) => y,
WindowConfig.TumblingCountWindow(10),
(x: Int, y: String) => x + y)
.setName("Joined_Streamlet_1")
.setNumPartitions(2)
verifyJoinedStreamlet[Int](numberStreamlet,
expectedName = "Joined_Streamlet_1",
expectedNumPartitions = 2)
verifyJoinedStreamlet[String](textStreamlet,
expectedName = "Joined_Streamlet_1",
expectedNumPartitions = 2)
}
test("StreamletImpl should support clone operation") {
val supplierStreamlet = builder
.newSource(() => Math.random)
.setName("Supplier_Streamlet_1")
.setNumPartitions(5)
val clonedStreamlets = supplierStreamlet.clone(numClones = 3)
assertEquals(3, clonedStreamlets.size)
verifyClonedStreamlets[Double](supplierStreamlet, numClones = 3)
}
test("StreamletImpl should support transform operation") {
val incrementTransformer =
new TestIncrementSerializableTransformer(factor = 100)
val supplierStreamlet = builder
.newSource(() => Random.nextInt(10))
.setName("Supplier_Streamlet_1")
.setNumPartitions(3)
supplierStreamlet
.map[Int] { num: Int =>
num * 10
}
.setName("Map_Streamlet_1")
.setNumPartitions(2)
.transform[Int](incrementTransformer)
.setName("Transformer_Streamlet_1")
.setNumPartitions(7)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[MapStreamlet[_, _]])
val mapStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[MapStreamlet[Int, Int]]
assertEquals("Map_Streamlet_1", mapStreamlet.getName)
assertEquals(2, mapStreamlet.getNumPartitions)
assertEquals(1, mapStreamlet.getChildren.size())
assertTrue(
mapStreamlet
.getChildren()
.get(0)
.isInstanceOf[TransformStreamlet[_, _]])
val transformStreamlet = mapStreamlet
.getChildren()
.get(0)
.asInstanceOf[TransformStreamlet[Int, Int]]
assertEquals("Transformer_Streamlet_1", transformStreamlet.getName)
assertEquals(7, transformStreamlet.getNumPartitions)
assertEquals(0, transformStreamlet.getChildren.size())
}
private class MyBoltOperator extends TestBolt with IStreamletRichOperator[Double, Double] {
}
test("StreamletImpl should support applyOperator operation on IStreamletRichOperator") {
val testOperator = new MyBoltOperator()
val supplierStreamlet = builder
.newSource(() => Random.nextDouble())
.setName("Supplier_Streamlet_1")
.setNumPartitions(3)
supplierStreamlet
.map[Double] { num: Double =>
num * 10
}
.setName("Map_Streamlet_1")
.setNumPartitions(2)
.applyOperator(testOperator)
.setName("Custom_Streamlet_1")
.setNumPartitions(7)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[MapStreamlet[_, _]])
val mapStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[MapStreamlet[Double, Double]]
assertEquals("Map_Streamlet_1", mapStreamlet.getName)
assertEquals(2, mapStreamlet.getNumPartitions)
assertEquals(1, mapStreamlet.getChildren.size())
assertTrue(
mapStreamlet
.getChildren()
.get(0)
.isInstanceOf[CustomStreamlet[_, _]])
val customStreamlet = mapStreamlet
.getChildren()
.get(0)
.asInstanceOf[CustomStreamlet[Double, Double]]
assertEquals("Custom_Streamlet_1", customStreamlet.getName)
assertEquals(7, customStreamlet.getNumPartitions)
assertEquals(0, customStreamlet.getChildren.size())
}
private class MyBasicBoltOperator extends TestBasicBolt
with IStreamletBasicOperator[Double, Double] {
}
test("StreamletImpl should support applyOperator operation on IStreamletBasicOperator") {
val testOperator = new MyBasicBoltOperator()
val supplierStreamlet = builder
.newSource(() => Random.nextDouble())
.setName("Supplier_Streamlet_1")
.setNumPartitions(3)
supplierStreamlet
.map[Double] { num: Double =>
num * 10
}
.setName("Map_Streamlet_1")
.setNumPartitions(2)
.applyOperator(testOperator)
.setName("CustomBasic_Streamlet_1")
.setNumPartitions(7)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[MapStreamlet[_, _]])
val mapStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[MapStreamlet[Double, Double]]
assertEquals("Map_Streamlet_1", mapStreamlet.getName)
assertEquals(2, mapStreamlet.getNumPartitions)
assertEquals(1, mapStreamlet.getChildren.size())
assertTrue(
mapStreamlet
.getChildren()
.get(0)
.isInstanceOf[CustomStreamlet[_, _]])
val customStreamlet = mapStreamlet
.getChildren()
.get(0)
.asInstanceOf[CustomStreamlet[Double, Double]]
assertEquals("CustomBasic_Streamlet_1", customStreamlet.getName)
assertEquals(7, customStreamlet.getNumPartitions)
assertEquals(0, customStreamlet.getChildren.size())
}
private class MyWindowBoltOperator extends TestWindowBolt
with IStreamletWindowOperator[Double, Double] {
}
test("StreamletImpl should support applyOperator operation on IStreamletWindowOperator") {
val testOperator = new MyWindowBoltOperator()
val supplierStreamlet = builder
.newSource(() => Random.nextDouble())
.setName("Supplier_Streamlet_1")
.setNumPartitions(3)
supplierStreamlet
.map[Double] { num: Double =>
num * 10
}
.setName("Map_Streamlet_1")
.setNumPartitions(2)
.applyOperator(testOperator)
.setName("CustomWindow_Streamlet_1")
.setNumPartitions(7)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[MapStreamlet[_, _]])
val mapStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[MapStreamlet[Double, Double]]
assertEquals("Map_Streamlet_1", mapStreamlet.getName)
assertEquals(2, mapStreamlet.getNumPartitions)
assertEquals(1, mapStreamlet.getChildren.size())
assertTrue(
mapStreamlet
.getChildren()
.get(0)
.isInstanceOf[CustomStreamlet[_, _]])
val customStreamlet = mapStreamlet
.getChildren()
.get(0)
.asInstanceOf[CustomStreamlet[Double, Double]]
assertEquals("CustomWindow_Streamlet_1", customStreamlet.getName)
assertEquals(7, customStreamlet.getNumPartitions)
assertEquals(0, customStreamlet.getChildren.size())
}
test("StreamletImpl should support reduce operation") {
val supplierStreamlet = builder
.newSource(() => Random.nextInt(10))
.setName("Supplier_Streamlet_1")
.setNumPartitions(3)
supplierStreamlet
.reduceByKeyAndWindow[Int, Int]((key: Int) => key * 100,
(value: Int) => 1,
WindowConfig.TumblingCountWindow(10),
(x: Int, y: Int) => x + y)
.setName("Reduce_Streamlet_1")
.setNumPartitions(5)
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[ReduceByKeyAndWindowStreamlet[_, _, _]])
val mapStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[ReduceByKeyAndWindowStreamlet[Int, Int, Int]]
assertEquals("Reduce_Streamlet_1", mapStreamlet.getName)
assertEquals(5, mapStreamlet.getNumPartitions)
assertEquals(0, mapStreamlet.getChildren.size())
}
private def verifyClonedStreamlets[R](supplierStreamlet: Streamlet[R],
numClones: Int): Unit = {
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[R]]
assertEquals(numClones, supplierStreamletImpl.getChildren.size)
for (index <- 0 to (numClones - 1)) {
assertTrue(
supplierStreamletImpl
.getChildren(index)
.isInstanceOf[MapStreamlet[_, _]])
val mapStreamlet = supplierStreamletImpl
.getChildren(index)
.asInstanceOf[MapStreamlet[R, R]]
assertEquals(0, mapStreamlet.getChildren.size())
}
}
private def verifyJoinedStreamlet[R](supplierStreamlet: Streamlet[R],
expectedName: String,
expectedNumPartitions: Int): Unit = {
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[R]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[JoinStreamlet[_, _, _, _]])
val joinStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[JoinStreamlet[String, Int, String, String]]
assertEquals(expectedName, joinStreamlet.getName)
assertEquals(expectedNumPartitions, joinStreamlet.getNumPartitions)
assertEquals(0, joinStreamlet.getChildren.size())
}
private def verifySupplierStreamlet(
supplierStreamlet: Streamlet[String]): Unit = {
val supplierStreamletImpl =
supplierStreamlet.asInstanceOf[StreamletImpl[String]]
assertEquals(1, supplierStreamletImpl.getChildren.size)
assertTrue(
supplierStreamletImpl
.getChildren(0)
.isInstanceOf[UnionStreamlet[_]])
val unionStreamlet = supplierStreamletImpl
.getChildren(0)
.asInstanceOf[UnionStreamlet[String]]
assertEquals("Union_Streamlet_1", unionStreamlet.getName)
assertEquals(0, unionStreamlet.getChildren.size())
assertEquals(4, unionStreamlet.getNumPartitions)
}
}