| // Copyright 2017 Twitter. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.twitter.heron.streamlet.impl; |
| |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.function.Consumer; |
| |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import com.twitter.heron.api.topology.TopologyBuilder; |
| import com.twitter.heron.streamlet.Context; |
| import com.twitter.heron.streamlet.KVStreamlet; |
| import com.twitter.heron.streamlet.KeyValue; |
| import com.twitter.heron.streamlet.SerializableTransformer; |
| import com.twitter.heron.streamlet.Streamlet; |
| import com.twitter.heron.streamlet.Window; |
| import com.twitter.heron.streamlet.WindowConfig; |
| import com.twitter.heron.streamlet.impl.streamlets.FilterStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.FlatMapStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.JoinStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.MapStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.MapToKVStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.ReduceByWindowStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.SupplierStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.TransformStreamlet; |
| import com.twitter.heron.streamlet.impl.streamlets.UnionStreamlet; |
| |
| import static org.junit.Assert.*; |
| |
| /** |
| * Unit tests for {@link StreamletImpl} |
| */ |
| public class StreamletImplTest { |
| |
| private <T> boolean isFullyBuilt(BaseStreamletImpl<T> streamlet) { |
| if (!streamlet.isBuilt()) { |
| return false; |
| } |
| for (BaseStreamletImpl<?> child : streamlet.getChildren()) { |
| if (!isFullyBuilt(child)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| @Before |
| public void setUp() { |
| } |
| |
| @Test |
| public void testBasicParams() throws Exception { |
| Streamlet<Double> sample = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| sample.setName("MyStreamlet"); |
| sample.setNumPartitions(20); |
| assertEquals("MyStreamlet", sample.getName()); |
| assertEquals(20, sample.getNumPartitions()); |
| sample.setName("AnotherName"); |
| assertEquals("AnotherName", sample.getName()); |
| sample.setNumPartitions(10); |
| assertEquals(10, sample.getNumPartitions()); |
| StreamletImpl<Double> bStreamlet = (StreamletImpl<Double>) sample; |
| assertFalse(bStreamlet.isBuilt()); |
| assertEquals(bStreamlet.getChildren().size(), 0); |
| } |
| |
| @Test |
| public void testSupplierStreamlet() throws Exception { |
| Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| assertTrue(streamlet instanceof SupplierStreamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testMapStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).map((num) -> num * 10); |
| assertTrue(streamlet instanceof MapStreamlet); |
| MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testMapToKVStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| KVStreamlet<Double, Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .mapToKV((num) -> new KeyValue<>(num, num)); |
| assertTrue(streamlet instanceof MapToKVStreamlet); |
| MapToKVStreamlet<Double, Double, Double> mStreamlet = |
| (MapToKVStreamlet<Double, Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFlatMapStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .flatMap((num) -> Arrays.asList(num * 10)); |
| assertTrue(streamlet instanceof FlatMapStreamlet); |
| FlatMapStreamlet<Double, Double> mStreamlet = (FlatMapStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFilterStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).filter((num) -> num != 0); |
| assertTrue(streamlet instanceof FilterStreamlet); |
| FilterStreamlet<Double> mStreamlet = (FilterStreamlet<Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testRepartitionStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).repartition(40); |
| assertTrue(streamlet instanceof MapStreamlet); |
| MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet; |
| assertEquals(40, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(20, supplierStreamlet.getNumPartitions()); |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCloneStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| List<Streamlet<Double>> streamlets = baseStreamlet.setNumPartitions(20).clone(2); |
| assertEquals(streamlets.size(), 2); |
| assertTrue(streamlets.get(0) instanceof MapStreamlet); |
| assertTrue(streamlets.get(1) instanceof MapStreamlet); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(20, supplierStreamlet.getNumPartitions()); |
| assertEquals(supplierStreamlet.getChildren().size(), 2); |
| assertEquals(supplierStreamlet.getChildren(), streamlets); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testUnionStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet1 = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| Streamlet<Double> baseStreamlet2 = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet1.union(baseStreamlet2); |
| assertTrue(streamlet instanceof UnionStreamlet); |
| SupplierStreamlet<Double> supplierStreamlet1 = (SupplierStreamlet<Double>) baseStreamlet1; |
| SupplierStreamlet<Double> supplierStreamlet2 = (SupplierStreamlet<Double>) baseStreamlet2; |
| assertEquals(supplierStreamlet1.getChildren().size(), 1); |
| assertEquals(supplierStreamlet1.getChildren().get(0), streamlet); |
| assertEquals(supplierStreamlet2.getChildren().size(), 1); |
| assertEquals(supplierStreamlet2.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testReduceByWindowStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| KVStreamlet<Window, Double> streamlet = |
| baseStreamlet.reduceByWindow(WindowConfig.TumblingCountWindow(10), (x, y) -> x + y); |
| assertTrue(streamlet instanceof ReduceByWindowStreamlet); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testTransformStreamlet() throws Exception { |
| Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); |
| Streamlet<Double> streamlet = |
| baseStreamlet.transform(new SerializableTransformer<Double, Double>() { |
| @Override |
| public void setup(Context context) { |
| |
| } |
| |
| @Override |
| public void transform(Double aDouble, Consumer<Double> consumer) { |
| consumer.accept(aDouble); |
| } |
| |
| @Override |
| public void cleanup() { |
| |
| } |
| }); |
| assertTrue(streamlet instanceof TransformStreamlet); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testSimpleBuild() throws Exception { |
| Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> "sa re ga ma"); |
| baseStreamlet.flatMap(x -> Arrays.asList(x.split(" "))) |
| .mapToKV(x -> new KeyValue<>(x, 1)) |
| .reduceByKeyAndWindow(WindowConfig.TumblingCountWindow(10), (x, y) -> x + y); |
| SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet; |
| assertFalse(supplierStreamlet.isBuilt()); |
| TopologyBuilder builder = new TopologyBuilder(); |
| Set<String> stageNames = new HashSet<>(); |
| supplierStreamlet.build(builder, stageNames); |
| assertTrue(isFullyBuilt(supplierStreamlet)); |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertTrue(supplierStreamlet.getChildren().get(0) instanceof FlatMapStreamlet); |
| FlatMapStreamlet<String, String> fStreamlet = |
| (FlatMapStreamlet<String, String>) supplierStreamlet.getChildren().get(0); |
| assertEquals(fStreamlet.getChildren().size(), 1); |
| assertTrue(fStreamlet.getChildren().get(0) instanceof MapToKVStreamlet); |
| MapToKVStreamlet<String, String, Integer> mkvStreamlet = |
| (MapToKVStreamlet<String, String, Integer>) fStreamlet.getChildren().get(0); |
| assertEquals(mkvStreamlet.getChildren().size(), 1); |
| assertTrue(mkvStreamlet.getChildren().get(0) instanceof ReduceByKeyAndWindowStreamlet); |
| ReduceByKeyAndWindowStreamlet<String, Integer> rStreamlet = |
| (ReduceByKeyAndWindowStreamlet<String, Integer>) mkvStreamlet.getChildren().get(0); |
| assertEquals(rStreamlet.getChildren().size(), 0); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testComplexBuild() throws Exception { |
| // First source |
| Streamlet<String> baseStreamlet1 = StreamletImpl.createSupplierStreamlet(() -> "sa re ga ma"); |
| KVStreamlet<String, Integer> leftStream = |
| baseStreamlet1.flatMap(x -> Arrays.asList(x.split(" "))) |
| .mapToKV(x -> new KeyValue<>(x, 1)); |
| |
| // Second source |
| Streamlet<String> baseStreamlet2 = StreamletImpl.createSupplierStreamlet(() -> "I Love You"); |
| KVStreamlet<String, Integer> rightStream = |
| baseStreamlet2.flatMap(x -> Arrays.asList(x.split(" "))) |
| .mapToKV(x -> new KeyValue<>(x, 1)); |
| |
| // join |
| leftStream.join(rightStream, WindowConfig.TumblingCountWindow(10), (x, y) -> x + y); |
| |
| SupplierStreamlet<String> supplierStreamlet1 = (SupplierStreamlet<String>) baseStreamlet1; |
| SupplierStreamlet<String> supplierStreamlet2 = (SupplierStreamlet<String>) baseStreamlet2; |
| assertFalse(supplierStreamlet1.isBuilt()); |
| assertFalse(supplierStreamlet2.isBuilt()); |
| TopologyBuilder builder = new TopologyBuilder(); |
| Set<String> stageNames = new HashSet<>(); |
| supplierStreamlet1.build(builder, stageNames); |
| assertTrue(supplierStreamlet1.isBuilt()); |
| assertFalse(isFullyBuilt(supplierStreamlet1)); |
| |
| supplierStreamlet2.build(builder, stageNames); |
| assertTrue(isFullyBuilt(supplierStreamlet1)); |
| assertTrue(isFullyBuilt(supplierStreamlet2)); |
| |
| // go over all stuff |
| assertEquals(supplierStreamlet1.getChildren().size(), 1); |
| assertTrue(supplierStreamlet1.getChildren().get(0) instanceof FlatMapStreamlet); |
| FlatMapStreamlet<String, String> fStreamlet = |
| (FlatMapStreamlet<String, String>) supplierStreamlet1.getChildren().get(0); |
| assertEquals(fStreamlet.getChildren().size(), 1); |
| assertTrue(fStreamlet.getChildren().get(0) instanceof MapToKVStreamlet); |
| MapToKVStreamlet<String, String, Integer> mkvStreamlet = |
| (MapToKVStreamlet<String, String, Integer>) fStreamlet.getChildren().get(0); |
| assertEquals(mkvStreamlet.getChildren().size(), 1); |
| assertTrue(mkvStreamlet.getChildren().get(0) instanceof JoinStreamlet); |
| JoinStreamlet<String, Integer, Integer, Integer> jStreamlet = |
| (JoinStreamlet<String, Integer, Integer, Integer>) mkvStreamlet.getChildren().get(0); |
| assertEquals(jStreamlet.getChildren().size(), 0); |
| |
| assertEquals(supplierStreamlet2.getChildren().size(), 1); |
| assertTrue(supplierStreamlet2.getChildren().get(0) instanceof FlatMapStreamlet); |
| fStreamlet = |
| (FlatMapStreamlet<String, String>) supplierStreamlet2.getChildren().get(0); |
| assertEquals(fStreamlet.getChildren().size(), 1); |
| assertTrue(fStreamlet.getChildren().get(0) instanceof MapToKVStreamlet); |
| mkvStreamlet = |
| (MapToKVStreamlet<String, String, Integer>) fStreamlet.getChildren().get(0); |
| assertEquals(mkvStreamlet.getChildren().size(), 1); |
| assertTrue(mkvStreamlet.getChildren().get(0) instanceof JoinStreamlet); |
| jStreamlet = |
| (JoinStreamlet<String, Integer, Integer, Integer>) mkvStreamlet.getChildren().get(0); |
| assertEquals(jStreamlet.getChildren().size(), 0); |
| } |
| } |