[Java Streamlet API] Move Source Logics to Builder (#3115)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 029b49e..853bb97 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.util.HashSet;
@@ -30,6 +29,9 @@
import org.apache.heron.streamlet.SerializableSupplier;
import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
@@ -47,21 +49,27 @@
@Override
public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
- StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
+ checkNotNull(supplier, "supplier cannot not be null");
+
+ StreamletImpl<R> retval = new SupplierStreamlet<>(supplier);
sources.add(retval);
return retval;
}
@Override
public <R> Streamlet<R> newSource(Source<R> generator) {
- StreamletImpl<R> retval = StreamletImpl.createGeneratorStreamlet(generator);
+ checkNotNull(generator, "generator cannot not be null");
+
+ StreamletImpl<R> retval = new SourceStreamlet<>(generator);
sources.add(retval);
return retval;
}
@Override
public <R> Streamlet<R> newSource(IRichSpout spout) {
- StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
+ checkNotNull(spout, "spout cannot not be null");
+
+ StreamletImpl<R> retval = new SpoutStreamlet<>(spout);
sources.add(retval);
return retval;
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index f382bcd..7abf9e2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.util.ArrayList;
@@ -25,7 +24,6 @@
import java.util.Set;
import java.util.logging.Logger;
-import org.apache.heron.api.spout.IRichSpout;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
@@ -36,10 +34,8 @@
import org.apache.heron.streamlet.SerializableConsumer;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.SerializablePredicate;
-import org.apache.heron.streamlet.SerializableSupplier;
import org.apache.heron.streamlet.SerializableTransformer;
import org.apache.heron.streamlet.Sink;
-import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
@@ -53,9 +49,6 @@
import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -253,36 +246,6 @@
}
/**
- * Create a Streamlet based on the supplier function
- * @param supplier The Supplier function to generate the elements
- */
- static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> supplier) {
- checkNotNull(supplier, "supplier cannot not be null");
-
- return new SupplierStreamlet<T>(supplier);
- }
-
- /**
- * Create a Streamlet based on the generator function
- * @param generator The Generator function to generate the elements
- */
- static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
- checkNotNull(generator, "generator cannot not be null");
-
- return new SourceStreamlet<T>(generator);
- }
-
- /**
- * Create a Streamlet based on a Spout object
- * @param spout The Spout function to generate the elements
- */
- static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
- checkNotNull(spout, "spout cannot not be null");
-
- return new SpoutStreamlet<T>(spout);
- }
-
- /**
* Return a new Streamlet by applying mapFn to each element of this Streamlet
* @param mapFn The Map Function that should be applied to each element
*/
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index a9b15bd..2237bce 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.util.Arrays;
@@ -34,6 +33,7 @@
import org.apache.heron.resource.TestBolt;
import org.apache.heron.resource.TestSpout;
import org.apache.heron.resource.TestWindowBolt;
+import org.apache.heron.streamlet.Builder;
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Context;
import org.apache.heron.streamlet.IStreamletBasicOperator;
@@ -65,9 +65,11 @@
*/
public class StreamletImplTest {
+ private Builder builder = Builder.newBuilder();
+
@Test
- public void testBasicParams() throws Exception {
- Streamlet<Double> sample = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testBasicParams() {
+ Streamlet<Double> sample = builder.newSource(() -> Math.random());
sample.setName("MyStreamlet");
sample.setNumPartitions(20);
assertEquals("MyStreamlet", sample.getName());
@@ -82,22 +84,22 @@
}
@Test
- public void testSupplierStreamlet() throws Exception {
- Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testSupplierStreamlet() {
+ Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
assertTrue(streamlet instanceof SupplierStreamlet);
}
@Test
- public void testSpoutStreamlet() throws Exception {
+ public void testSpoutStreamlet() {
TestSpout spout = new TestSpout();
- Streamlet<Double> streamlet = StreamletImpl.createSpoutStreamlet(spout);
+ Streamlet<Double> streamlet = builder.newSource(spout);
assertTrue(streamlet instanceof SpoutStreamlet);
}
@Test
@SuppressWarnings("unchecked")
- public void testMapStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testMapStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).map((num) -> num * 10);
assertTrue(streamlet instanceof MapStreamlet);
MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet;
@@ -109,8 +111,8 @@
@Test
@SuppressWarnings("unchecked")
- public void testFlatMapStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testFlatMapStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.flatMap((num) -> Arrays.asList(num * 10));
assertTrue(streamlet instanceof FlatMapStreamlet);
@@ -123,8 +125,8 @@
@Test
@SuppressWarnings("unchecked")
- public void testFilterStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testFilterStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).filter((num) -> num != 0);
assertTrue(streamlet instanceof FilterStreamlet);
FilterStreamlet<Double> mStreamlet = (FilterStreamlet<Double>) streamlet;
@@ -136,8 +138,8 @@
@Test
@SuppressWarnings("unchecked")
- public void testRepartitionStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testRepartitionStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).repartition(40);
assertTrue(streamlet instanceof MapStreamlet);
MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet;
@@ -151,7 +153,7 @@
@Test
@SuppressWarnings("unchecked")
public void testCloneStreamlet() {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
List<Streamlet<Double>> streamlets = baseStreamlet.setNumPartitions(20).clone(2);
assertEquals(streamlets.size(), 2);
assertTrue(streamlets.get(0) instanceof MapStreamlet);
@@ -164,9 +166,9 @@
@Test
@SuppressWarnings("unchecked")
- public void testUnionStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet1 = StreamletImpl.createSupplierStreamlet(() -> Math.random());
- Streamlet<Double> baseStreamlet2 = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testUnionStreamlet() {
+ Streamlet<Double> baseStreamlet1 = builder.newSource(() -> Math.random());
+ Streamlet<Double> baseStreamlet2 = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet1.union(baseStreamlet2);
assertTrue(streamlet instanceof UnionStreamlet);
SupplierStreamlet<Double> supplierStreamlet1 = (SupplierStreamlet<Double>) baseStreamlet1;
@@ -179,8 +181,8 @@
@Test
@SuppressWarnings("unchecked")
- public void testTransformStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testTransformStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet =
baseStreamlet.transform(new SerializableTransformer<Double, Double>() {
@Override
@@ -209,8 +211,8 @@
@Test
@SuppressWarnings("unchecked")
- public void testCustomStreamletFromBolt() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testCustomStreamletFromBolt() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new MyBoltOperator());
assertTrue(streamlet instanceof CustomStreamlet);
@@ -227,8 +229,8 @@
@Test
@SuppressWarnings("unchecked")
- public void testCustomStreamletFromBasicBolt() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testCustomStreamletFromBasicBolt() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new MyBasicBoltOperator());
assertTrue(streamlet instanceof CustomStreamlet);
@@ -246,8 +248,8 @@
@Test
@SuppressWarnings("unchecked")
- public void testCustomStreamletFromWindowBolt() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testCustomStreamletFromWindowBolt() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new MyWindowBoltOperator());
assertTrue(streamlet instanceof CustomStreamlet);
@@ -261,16 +263,16 @@
@Test
@SuppressWarnings("unchecked")
- public void testSimpleBuild() throws Exception {
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> "sa re ga ma");
+ public void testSimpleBuild() {
+ Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
baseStreamlet.flatMap(x -> Arrays.asList(x.split(" ")))
.reduceByKeyAndWindow(x -> x, x -> 1, WindowConfig.TumblingCountWindow(10),
(x, y) -> x + y);
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
assertFalse(supplierStreamlet.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet.build(builder, stageNames);
+ supplierStreamlet.build(topologyBuilder, stageNames);
assertTrue(supplierStreamlet.allBuilt());
assertEquals(supplierStreamlet.getChildren().size(), 1);
assertTrue(supplierStreamlet.getChildren().get(0) instanceof FlatMapStreamlet);
@@ -286,14 +288,14 @@
@Test
@SuppressWarnings("unchecked")
- public void testComplexBuild() throws Exception {
+ public void testComplexBuild() {
// First source
- Streamlet<String> baseStreamlet1 = StreamletImpl.createSupplierStreamlet(() -> "sa re ga ma");
+ Streamlet<String> baseStreamlet1 = builder.newSource(() -> "sa re ga ma");
Streamlet<String> leftStream =
baseStreamlet1.flatMap(x -> Arrays.asList(x.split(" ")));
// Second source
- Streamlet<String> baseStreamlet2 = StreamletImpl.createSupplierStreamlet(() -> "I Love You");
+ Streamlet<String> baseStreamlet2 = builder.newSource(() -> "I Love You");
Streamlet<String> rightStream =
baseStreamlet2.flatMap(x -> Arrays.asList(x.split(" ")));
@@ -305,13 +307,13 @@
SupplierStreamlet<String> supplierStreamlet2 = (SupplierStreamlet<String>) baseStreamlet2;
assertFalse(supplierStreamlet1.isBuilt());
assertFalse(supplierStreamlet2.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet1.build(builder, stageNames);
+ supplierStreamlet1.build(topologyBuilder, stageNames);
assertTrue(supplierStreamlet1.isBuilt());
assertFalse(supplierStreamlet1.allBuilt());
- supplierStreamlet2.build(builder, stageNames);
+ supplierStreamlet2.build(topologyBuilder, stageNames);
assertTrue(supplierStreamlet1.allBuilt());
assertTrue(supplierStreamlet2.allBuilt());
@@ -341,7 +343,7 @@
@SuppressWarnings("unchecked")
public void testCalculatedDefaultStageNames() {
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 0);
@@ -351,9 +353,9 @@
// build SupplierStreamlet
assertFalse(supplierStreamlet.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet.build(builder, stageNames);
+ supplierStreamlet.build(topologyBuilder, stageNames);
// verify SupplierStreamlet
assertTrue(supplierStreamlet.allBuilt());
@@ -396,7 +398,7 @@
@Test
public void testDefaultStreamletNameIfNotSet() {
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
Set<String> stageNames = new HashSet<>();
@@ -414,7 +416,7 @@
public void testStreamletNameIfAlreadySet() {
String supplierName = "MyStringSupplier";
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
supplierStreamlet.setName(supplierName);
@@ -432,7 +434,7 @@
@Test(expected = RuntimeException.class)
public void testStreamletNameIfDuplicateNameIsSet() {
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
@@ -444,14 +446,14 @@
// build SupplierStreamlet
assertFalse(supplierStreamlet.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet.build(builder, stageNames);
+ supplierStreamlet.build(topologyBuilder, stageNames);
}
@Test
public void testSetNameWithInvalidValues() {
- Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
Function<String, Streamlet<Double>> function = streamlet::setName;
testByFunction(function, null);
testByFunction(function, "");
@@ -460,14 +462,14 @@
@Test(expected = IllegalArgumentException.class)
public void testSetNumPartitionsWithInvalidValue() {
- Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
streamlet.setNumPartitions(0);
}
@Test(expected = IllegalArgumentException.class)
@SuppressWarnings("unchecked")
public void testCloneStreamletWithInvalidNumberOfClone() {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
baseStreamlet.setNumPartitions(20).clone(0);
}