| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.heron.streamlet.impl; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| |
| import org.junit.Test; |
| |
| import org.apache.heron.api.grouping.ShuffleStreamGrouping; |
| import org.apache.heron.api.topology.TopologyBuilder; |
| import org.apache.heron.api.utils.Utils; |
| import org.apache.heron.common.basics.ByteAmount; |
| import org.apache.heron.resource.TestBasicBolt; |
| import org.apache.heron.resource.TestBolt; |
| import org.apache.heron.resource.TestSpout; |
| import org.apache.heron.resource.TestWindowBolt; |
| import org.apache.heron.streamlet.Builder; |
| import org.apache.heron.streamlet.Config; |
| import org.apache.heron.streamlet.Context; |
| import org.apache.heron.streamlet.IStreamletBasicOperator; |
| import org.apache.heron.streamlet.IStreamletRichOperator; |
| import org.apache.heron.streamlet.IStreamletWindowOperator; |
| import org.apache.heron.streamlet.KVStreamlet; |
| import org.apache.heron.streamlet.KeyedWindow; |
| import org.apache.heron.streamlet.SerializableConsumer; |
| import org.apache.heron.streamlet.SerializablePredicate; |
| import org.apache.heron.streamlet.SerializableTransformer; |
| import org.apache.heron.streamlet.Source; |
| import org.apache.heron.streamlet.Streamlet; |
| import org.apache.heron.streamlet.StreamletReducers; |
| import org.apache.heron.streamlet.WindowConfig; |
| import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.CountByKeyStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.KVStreamletShadow; |
| import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.MapStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet; |
| import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Unit tests for {@link StreamletImpl} |
| */ |
| public class StreamletImplTest { |
| |
| private Builder builder = Builder.newBuilder(); |
| |
| @Test |
| public void testBasicParams() { |
| Streamlet<Double> sample = builder.newSource(() -> Math.random()); |
| sample.setName("MyStreamlet"); |
| sample.setNumPartitions(20); |
| assertEquals("MyStreamlet", sample.getName()); |
| assertEquals(20, sample.getNumPartitions()); |
| sample.setName("AnotherName"); |
| assertEquals("AnotherName", sample.getName()); |
| sample.setNumPartitions(10); |
| assertEquals(10, sample.getNumPartitions()); |
| StreamletImpl<Double> bStreamlet = (StreamletImpl<Double>) sample; |
| assertFalse(bStreamlet.isBuilt()); |
| assertEquals(bStreamlet.getChildren().size(), 0); |
| } |
| |
| @Test |
| public void testSplitAndWithStream() { |
| Map<String, SerializablePredicate<Double>> splitter = new HashMap(); |
| splitter.put("all", i -> true); |
| splitter.put("positive", i -> i > 0); |
| splitter.put("negative", i -> i < 0); |
| |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| // The streamlet should have three output streams after split() |
| Streamlet<Double> multiStreams = baseStreamlet.split(splitter); |
| |
| // Default stream is used |
| Streamlet<Double> positiveStream = multiStreams.withStream("positive"); |
| Streamlet<Double> negativeStream = multiStreams.withStream("negative"); |
| |
| Streamlet<Double> allMap = multiStreams.withStream("all").map((num) -> num * 10); |
| Streamlet<Double> positiveMap = positiveStream.map((num) -> num * 10); |
| Streamlet<Double> negativeMap = negativeStream.map((num) -> num * 10); |
| |
| // Original streamlet should still have the default strean id eventhough the id |
| // is not available. Other shadow streamlets should have the correct stream ids. |
| assertEquals(multiStreams.getStreamId(), Utils.DEFAULT_STREAM_ID); |
| assertEquals(positiveStream.getStreamId(), "positive"); |
| assertEquals(negativeStream.getStreamId(), "negative"); |
| |
| StreamletImpl<Double> impl = (StreamletImpl<Double>) multiStreams; |
| assertEquals(impl.getChildren().size(), 3); |
| |
| // Children streamlets should have the right parent stream id |
| assertEquals(((MapStreamlet<Double, Double>) allMap).getParent().getStreamId(), |
| "all"); |
| assertEquals(((MapStreamlet<Double, Double>) positiveMap).getParent().getStreamId(), |
| "positive"); |
| assertEquals(((MapStreamlet<Double, Double>) negativeMap).getParent().getStreamId(), |
| "negative"); |
| } |
| |
| @Test(expected = RuntimeException.class) |
| public void testSplitAndWithWrongStream() { |
| Map<String, SerializablePredicate<Double>> splitter = new HashMap(); |
| splitter.put("all", i -> true); |
| splitter.put("positive", i -> i > 0); |
| splitter.put("negative", i -> i < 0); |
| |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| // The streamlet should have three output streams after split() |
| Streamlet<Double> multiStreams = baseStreamlet.split(splitter); |
| |
| // Select a good stream id and a bad stream id |
| Streamlet<Double> goodStream = multiStreams.withStream("positive"); |
| Streamlet<Double> badStream = multiStreams.withStream("wrong-id"); |
| } |
| |
| @Test(expected = RuntimeException.class) |
| public void testWithWrongStream() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| // Normal Streamlet objects, including sources, have only the default stream id. |
| // Selecting any other stream using withStream() should trigger a runtime |
| // exception |
| Streamlet<Double> badStream = baseStreamlet.withStream("wrong-id"); |
| } |
| |
| @Test |
| public void testSupplierStreamlet() { |
| Streamlet<Double> streamlet = builder.newSource(() -> Math.random()); |
| assertTrue(streamlet instanceof SupplierStreamlet); |
| } |
| |
| @Test |
| public void testSourceStreamlet() { |
| Streamlet<String> streamlet = builder.newSource(new TestSource()); |
| assertTrue(streamlet instanceof SourceStreamlet); |
| } |
| |
| @Test |
| public void testSpoutStreamlet() { |
| TestSpout spout = new TestSpout(); |
| Streamlet<Double> streamlet = builder.newSource(spout); |
| assertTrue(streamlet instanceof SpoutStreamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testMapStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).map((num) -> num * 10); |
| assertTrue(streamlet instanceof MapStreamlet); |
| MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFlatMapStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .flatMap((num) -> Arrays.asList(num * 10)); |
| assertTrue(streamlet instanceof FlatMapStreamlet); |
| FlatMapStreamlet<Double, Double> mStreamlet = (FlatMapStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFilterStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).filter((num) -> num != 0); |
| assertTrue(streamlet instanceof FilterStreamlet); |
| FilterStreamlet<Double> mStreamlet = (FilterStreamlet<Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testRepartitionStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).repartition(40); |
| assertTrue(streamlet instanceof MapStreamlet); |
| MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet; |
| assertEquals(40, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(20, supplierStreamlet.getNumPartitions()); |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCloneStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| List<Streamlet<Double>> streamlets = baseStreamlet.setNumPartitions(20).clone(2); |
| assertEquals(streamlets.size(), 2); |
| assertTrue(streamlets.get(0) instanceof MapStreamlet); |
| assertTrue(streamlets.get(1) instanceof MapStreamlet); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(20, supplierStreamlet.getNumPartitions()); |
| assertEquals(supplierStreamlet.getChildren().size(), 2); |
| assertEquals(supplierStreamlet.getChildren(), streamlets); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testUnionStreamlet() { |
| Streamlet<Double> baseStreamlet1 = builder.newSource(() -> Math.random()); |
| Streamlet<Double> baseStreamlet2 = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet1.union(baseStreamlet2); |
| assertTrue(streamlet instanceof UnionStreamlet); |
| SupplierStreamlet<Double> supplierStreamlet1 = (SupplierStreamlet<Double>) baseStreamlet1; |
| SupplierStreamlet<Double> supplierStreamlet2 = (SupplierStreamlet<Double>) baseStreamlet2; |
| assertEquals(supplierStreamlet1.getChildren().size(), 1); |
| assertEquals(supplierStreamlet1.getChildren().get(0), streamlet); |
| assertEquals(supplierStreamlet2.getChildren().size(), 1); |
| assertEquals(supplierStreamlet2.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testTransformStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = |
| baseStreamlet.transform(new SerializableTransformer<Double, Double>() { |
| @Override |
| public void setup(Context context) { |
| |
| } |
| |
| @Override |
| public void transform(Double aDouble, Consumer<Double> consumer) { |
| consumer.accept(aDouble); |
| } |
| |
| @Override |
| public void cleanup() { |
| |
| } |
| }); |
| assertTrue(streamlet instanceof TransformStreamlet); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| private class MyBoltOperator extends TestBolt implements IStreamletRichOperator<Double, Double> { |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCustomStreamletFromBolt() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .applyOperator(new MyBoltOperator()); |
| assertTrue(streamlet instanceof CustomStreamlet); |
| CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCustomStreamletWithGrouperFromBolt() throws Exception { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .applyOperator(new MyBoltOperator(), |
| new ShuffleStreamGrouping()); |
| assertTrue(streamlet instanceof CustomStreamlet); |
| CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| private class MyBasicBoltOperator extends TestBasicBolt |
| implements IStreamletBasicOperator<Double, Double> { |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCustomStreamletFromBasicBolt() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .applyOperator(new MyBasicBoltOperator()); |
| assertTrue(streamlet instanceof CustomStreamlet); |
| CustomStreamlet<Double, Double> mStreamlet = |
| (CustomStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| private class MyWindowBoltOperator extends TestWindowBolt |
| implements IStreamletWindowOperator<Double, Double> { |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCustomStreamletFromWindowBolt() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .applyOperator(new MyWindowBoltOperator()); |
| assertTrue(streamlet instanceof CustomStreamlet); |
| CustomStreamlet<Double, Double> mStreamlet = |
| (CustomStreamlet<Double, Double>) streamlet; |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), streamlet); |
| } |
| |
| @Test |
| public void testKeyByStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| KVStreamlet<Long, Double> streamlet = baseStreamlet.keyBy(x -> Math.round(x)); |
| |
| assertTrue(streamlet instanceof KVStreamletShadow); |
| KVStreamletShadow<Long, Double> mStreamlet = |
| (KVStreamletShadow<Long, Double>) streamlet; |
| assertTrue(mStreamlet.getReal() instanceof KeyByStreamlet); |
| assertEquals(1, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal()); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testReduceByKeyStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .<String, Double>reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"), |
| x -> x, |
| StreamletReducers::sum); |
| |
| assertTrue(streamlet instanceof KVStreamletShadow); |
| KVStreamletShadow<String, Double> mStreamlet = |
| (KVStreamletShadow<String, Double>) streamlet; |
| assertTrue(mStreamlet.getReal() instanceof ReduceByKeyStreamlet); |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal()); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testGeneralReduceByKeyStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20) |
| .reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"), |
| 0.0, |
| StreamletReducers::sum); |
| |
| assertTrue(streamlet instanceof KVStreamletShadow); |
| KVStreamletShadow<String, Double> mStreamlet = |
| (KVStreamletShadow<String, Double>) streamlet; |
| assertTrue(mStreamlet.getReal() instanceof GeneralReduceByKeyStreamlet); |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal()); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCountByKeyStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| KVStreamlet<String, Long> streamlet = baseStreamlet.setNumPartitions(20) |
| .countByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero")); |
| |
| assertTrue(streamlet instanceof KVStreamletShadow); |
| KVStreamletShadow<String, Long> mStreamlet = |
| (KVStreamletShadow<String, Long>) streamlet; |
| assertTrue(mStreamlet.getReal() instanceof CountByKeyStreamlet); |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal()); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCountByKeyAndWindowStreamlet() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| KVStreamlet<KeyedWindow<String>, Long> streamlet = baseStreamlet.setNumPartitions(20) |
| .countByKeyAndWindow(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"), |
| WindowConfig.TumblingCountWindow(10)); |
| |
| assertTrue(streamlet instanceof KVStreamletShadow); |
| KVStreamletShadow<KeyedWindow<String>, Long> mStreamlet = |
| (KVStreamletShadow<KeyedWindow<String>, Long>) streamlet; |
| assertTrue(mStreamlet.getReal() instanceof CountByKeyAndWindowStreamlet); |
| assertEquals(20, mStreamlet.getNumPartitions()); |
| SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal()); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testSimpleBuild() throws Exception { |
| Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma"); |
| baseStreamlet.flatMap(x -> Arrays.asList(x.split(" "))) |
| .reduceByKeyAndWindow(x -> x, x -> 1, WindowConfig.TumblingCountWindow(10), |
| (x, y) -> x + y); |
| SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet; |
| assertFalse(supplierStreamlet.isBuilt()); |
| TopologyBuilder topologyBuilder = new TopologyBuilder(); |
| Set<String> stageNames = new HashSet<>(); |
| supplierStreamlet.build(topologyBuilder, stageNames); |
| assertTrue(supplierStreamlet.isFullyBuilt()); |
| assertEquals(supplierStreamlet.getChildren().size(), 1); |
| assertTrue(supplierStreamlet.getChildren().get(0) instanceof FlatMapStreamlet); |
| FlatMapStreamlet<String, String> fStreamlet = |
| (FlatMapStreamlet<String, String>) supplierStreamlet.getChildren().get(0); |
| assertEquals(fStreamlet.getChildren().size(), 1); |
| assertTrue(fStreamlet.getChildren().get(0) instanceof ReduceByKeyAndWindowStreamlet); |
| ReduceByKeyAndWindowStreamlet<String, Integer, Integer> rStreamlet = |
| (ReduceByKeyAndWindowStreamlet<String, Integer, Integer>) fStreamlet |
| .getChildren().get(0); |
| assertEquals(rStreamlet.getChildren().size(), 0); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testComplexBuild() { |
| // First source |
| Streamlet<String> baseStreamlet1 = builder.newSource(() -> "sa re ga ma"); |
| Streamlet<String> leftStream = |
| baseStreamlet1.flatMap(x -> Arrays.asList(x.split(" "))); |
| |
| // Second source |
| Streamlet<String> baseStreamlet2 = builder.newSource(() -> "I Love You"); |
| Streamlet<String> rightStream = |
| baseStreamlet2.flatMap(x -> Arrays.asList(x.split(" "))); |
| |
| // join |
| leftStream.join(rightStream, x -> x, x -> x, |
| WindowConfig.TumblingCountWindow(10), (x, y) -> x + y); |
| |
| SupplierStreamlet<String> supplierStreamlet1 = (SupplierStreamlet<String>) baseStreamlet1; |
| SupplierStreamlet<String> supplierStreamlet2 = (SupplierStreamlet<String>) baseStreamlet2; |
| assertFalse(supplierStreamlet1.isBuilt()); |
| assertFalse(supplierStreamlet2.isBuilt()); |
| TopologyBuilder topologyBuilder = new TopologyBuilder(); |
| Set<String> stageNames = new HashSet<>(); |
| supplierStreamlet1.build(topologyBuilder, stageNames); |
| assertTrue(supplierStreamlet1.isBuilt()); |
| assertFalse(supplierStreamlet1.isFullyBuilt()); |
| |
| supplierStreamlet2.build(topologyBuilder, stageNames); |
| assertTrue(supplierStreamlet1.isFullyBuilt()); |
| assertTrue(supplierStreamlet2.isFullyBuilt()); |
| |
| // go over all stuff |
| assertEquals(supplierStreamlet1.getChildren().size(), 1); |
| assertTrue(supplierStreamlet1.getChildren().get(0) instanceof FlatMapStreamlet); |
| FlatMapStreamlet<String, String> fStreamlet = |
| (FlatMapStreamlet<String, String>) supplierStreamlet1.getChildren().get(0); |
| assertEquals(fStreamlet.getChildren().size(), 1); |
| assertTrue(fStreamlet.getChildren().get(0) instanceof JoinStreamlet); |
| JoinStreamlet<String, String, String, String> jStreamlet = |
| (JoinStreamlet<String, String, String, String>) fStreamlet.getChildren().get(0); |
| assertEquals(jStreamlet.getChildren().size(), 0); |
| |
| assertEquals(supplierStreamlet2.getChildren().size(), 1); |
| assertTrue(supplierStreamlet2.getChildren().get(0) instanceof FlatMapStreamlet); |
| fStreamlet = |
| (FlatMapStreamlet<String, String>) supplierStreamlet2.getChildren().get(0); |
| assertEquals(fStreamlet.getChildren().size(), 1); |
| assertTrue(fStreamlet.getChildren().get(0) instanceof JoinStreamlet); |
| jStreamlet = |
| (JoinStreamlet<String, String, String, String>) fStreamlet.getChildren().get(0); |
| assertEquals(jStreamlet.getChildren().size(), 0); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCalculatedDefaultStageNames() { |
| // create SupplierStreamlet |
| Streamlet<String> baseStreamlet = builder.newSource(() -> |
| "This is test content"); |
| SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet; |
| assertEquals(supplierStreamlet.getChildren().size(), 0); |
| |
| // apply the consumer function |
| baseStreamlet.consume((SerializableConsumer<String>) s -> { }); |
| |
| // build SupplierStreamlet |
| assertFalse(supplierStreamlet.isBuilt()); |
| TopologyBuilder topologyBuilder = new TopologyBuilder(); |
| Set<String> stageNames = new HashSet<>(); |
| supplierStreamlet.build(topologyBuilder, stageNames); |
| |
| // verify SupplierStreamlet |
| assertTrue(supplierStreamlet.isFullyBuilt()); |
| assertEquals(1, supplierStreamlet.getChildren().size()); |
| assertTrue(supplierStreamlet.getChildren().get(0) instanceof ConsumerStreamlet); |
| assertEquals("consumer1", supplierStreamlet.getChildren().get(0).getName()); |
| |
| // verify stageNames |
| assertEquals(2, stageNames.size()); |
| List<String> expectedStageNames = Arrays.asList("consumer1", "supplier1"); |
| assertTrue(stageNames.containsAll(expectedStageNames)); |
| |
| // verify ConsumerStreamlet |
| ConsumerStreamlet<String> consumerStreamlet = |
| (ConsumerStreamlet<String>) supplierStreamlet.getChildren().get(0); |
| assertEquals(0, consumerStreamlet.getChildren().size()); |
| } |
| |
| @Test |
| public void testConfigBuilderDefaultConfig() { |
| Config defaultConfig = Config.defaultConfig(); |
| assertEquals(defaultConfig.getSerializer(), Config.Serializer.KRYO); |
| assertEquals(0, Double.compare(defaultConfig.getPerContainerCpu(), -1.0)); |
| assertEquals(defaultConfig.getPerContainerRam(), ByteAmount.fromBytes(-1).asBytes()); |
| assertEquals(defaultConfig.getDeliverySemantics(), Config.DeliverySemantics.ATMOST_ONCE); |
| |
| org.apache.heron.api.Config conf = defaultConfig.getHeronConfig(); |
| assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED)); |
| assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_CPU_HINT)); |
| assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED)); |
| assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_RAM_HINT)); |
| } |
| |
| @Test |
| public void testConfigBuilderNonDefaultConfig() { |
| Config nonDefaultConfig = Config.newBuilder() |
| .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE) |
| .setSerializer(Config.Serializer.JAVA) |
| .setPerContainerCpu(3.5) |
| .setPerContainerRamInGigabytes(10) |
| .build(); |
| assertEquals(nonDefaultConfig.getDeliverySemantics(), |
| Config.DeliverySemantics.EFFECTIVELY_ONCE); |
| assertEquals(nonDefaultConfig.getSerializer(), Config.Serializer.JAVA); |
| assertEquals(nonDefaultConfig.getPerContainerRamAsGigabytes(), 10); |
| assertEquals(nonDefaultConfig.getPerContainerRamAsMegabytes(), 1024 * 10); |
| assertEquals(0, Double.compare(nonDefaultConfig.getPerContainerCpu(), 3.5)); |
| |
| org.apache.heron.api.Config conf = nonDefaultConfig.getHeronConfig(); |
| assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED), |
| "3.5"); |
| assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_CPU_HINT), |
| "3.5"); |
| assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED), |
| "10737418240"); |
| assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_RAM_HINT), |
| "10737418240"); |
| } |
| |
| @Test |
| public void testDefaultStreamletNameIfNotSet() { |
| // create SupplierStreamlet |
| Streamlet<String> baseStreamlet = builder.newSource(() -> |
| "This is test content"); |
| SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet; |
| Set<String> stageNames = new HashSet<>(); |
| |
| // set default name by streamlet name prefix |
| supplierStreamlet.setDefaultNameIfNone( |
| StreamletImpl.StreamletNamePrefix.SUPPLIER, stageNames); |
| |
| // verify stageNames |
| assertEquals(1, stageNames.size()); |
| assertTrue(stageNames.containsAll(Arrays.asList("supplier1"))); |
| } |
| |
| @Test |
| public void testStreamletNameIfAlreadySet() { |
| String supplierName = "MyStringSupplier"; |
| // create SupplierStreamlet |
| Streamlet<String> baseStreamlet = builder.newSource(() -> |
| "This is test content"); |
| SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet; |
| supplierStreamlet.setName(supplierName); |
| Set<String> stageNames = new HashSet<>(); |
| |
| // set default name by streamlet name prefix |
| supplierStreamlet.setDefaultNameIfNone( |
| StreamletImpl.StreamletNamePrefix.SUPPLIER, stageNames); |
| |
| // verify stageNames |
| assertEquals(1, stageNames.size()); |
| assertTrue(stageNames.containsAll(Arrays.asList(supplierName))); |
| } |
| |
| @Test(expected = RuntimeException.class) |
| public void testStreamletNameIfDuplicateNameIsSet() { |
| // create SupplierStreamlet |
| Streamlet<String> baseStreamlet = builder.newSource(() -> |
| "This is test content"); |
| |
| SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet; |
| |
| // set duplicate streamlet name and expect thrown exception |
| supplierStreamlet |
| .map((content) -> content.toUpperCase()).setName("MyMapStreamlet") |
| .map((content) -> content + "_test_suffix").setName("MyMapStreamlet"); |
| |
| // build SupplierStreamlet |
| assertFalse(supplierStreamlet.isBuilt()); |
| TopologyBuilder topologyBuilder = new TopologyBuilder(); |
| Set<String> stageNames = new HashSet<>(); |
| supplierStreamlet.build(topologyBuilder, stageNames); |
| } |
| |
| @Test |
| public void testSetNameWithInvalidValues() { |
| Streamlet<Double> streamlet = builder.newSource(() -> Math.random()); |
| Function<String, Streamlet<Double>> function = streamlet::setName; |
| testByFunction(function, null); |
| testByFunction(function, ""); |
| testByFunction(function, " "); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testSetNumPartitionsWithInvalidValue() { |
| Streamlet<Double> streamlet = builder.newSource(() -> Math.random()); |
| streamlet.setNumPartitions(0); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testWithStreamWithInvalidValue() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| baseStreamlet.withStream(""); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| @SuppressWarnings("unchecked") |
| public void testCloneStreamletWithInvalidNumberOfClone() { |
| Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random()); |
| baseStreamlet.setNumPartitions(20).clone(0); |
| } |
| |
| private void testByFunction(Function<String, Streamlet<Double>> function, String sName) { |
| try { |
| function.apply(sName); |
| fail("Should have thrown an IllegalArgumentException because streamlet name is invalid"); |
| } catch (IllegalArgumentException e) { |
| assertEquals("Streamlet name cannot be null/blank", e.getMessage()); |
| } |
| } |
| |
| private class TestSource implements Source<String> { |
| |
| private List<String> list; |
| @Override |
| public void setup(Context context) { |
| list = Arrays.asList("aa", "bb", "cc"); |
| } |
| |
| @Override |
| public Collection<String> get() { |
| return list; |
| } |
| |
| @Override |
| public void cleanup() { |
| list.clear(); |
| } |
| } |
| |
| } |