[Java Streamlet API] Move Source Logics to Builder (#3115)

diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 029b49e..853bb97 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.heron.streamlet.impl;
 
 import java.util.HashSet;
@@ -30,6 +29,9 @@
 import org.apache.heron.streamlet.SerializableSupplier;
 import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 
 import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
 
@@ -47,21 +49,27 @@
 
   @Override
   public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
-    StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
+    checkNotNull(supplier, "supplier cannot not be null");
+
+    StreamletImpl<R> retval = new SupplierStreamlet<>(supplier);
     sources.add(retval);
     return retval;
   }
 
   @Override
   public <R> Streamlet<R> newSource(Source<R> generator) {
-    StreamletImpl<R> retval = StreamletImpl.createGeneratorStreamlet(generator);
+    checkNotNull(generator, "generator cannot not be null");
+
+    StreamletImpl<R> retval = new SourceStreamlet<>(generator);
     sources.add(retval);
     return retval;
   }
 
   @Override
   public <R> Streamlet<R> newSource(IRichSpout spout) {
-    StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
+    checkNotNull(spout, "spout cannot not be null");
+
+    StreamletImpl<R> retval = new SpoutStreamlet<>(spout);
     sources.add(retval);
     return retval;
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index f382bcd..7abf9e2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.heron.streamlet.impl;
 
 import java.util.ArrayList;
@@ -25,7 +24,6 @@
 import java.util.Set;
 import java.util.logging.Logger;
 
-import org.apache.heron.api.spout.IRichSpout;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.IStreamletOperator;
 import org.apache.heron.streamlet.JoinType;
@@ -36,10 +34,8 @@
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.SerializablePredicate;
-import org.apache.heron.streamlet.SerializableSupplier;
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Sink;
-import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
@@ -53,9 +49,6 @@
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
 
@@ -253,36 +246,6 @@
   }
 
   /**
-   * Create a Streamlet based on the supplier function
-   * @param supplier The Supplier function to generate the elements
-   */
-  static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> supplier) {
-    checkNotNull(supplier, "supplier cannot not be null");
-
-    return new SupplierStreamlet<T>(supplier);
-  }
-
-  /**
-   * Create a Streamlet based on the generator function
-   * @param generator The Generator function to generate the elements
-   */
-  static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
-    checkNotNull(generator, "generator cannot not be null");
-
-    return new SourceStreamlet<T>(generator);
-  }
-
-  /**
-   * Create a Streamlet based on a Spout object
-   * @param spout The Spout function to generate the elements
-   */
-  static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
-    checkNotNull(spout, "spout cannot not be null");
-
-    return new SpoutStreamlet<T>(spout);
-  }
-
-  /**
    * Return a new Streamlet by applying mapFn to each element of this Streamlet
    * @param mapFn The Map Function that should be applied to each element
   */
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index a9b15bd..2237bce 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.heron.streamlet.impl;
 
 import java.util.Arrays;
@@ -34,6 +33,7 @@
 import org.apache.heron.resource.TestBolt;
 import org.apache.heron.resource.TestSpout;
 import org.apache.heron.resource.TestWindowBolt;
+import org.apache.heron.streamlet.Builder;
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Context;
 import org.apache.heron.streamlet.IStreamletBasicOperator;
@@ -65,9 +65,11 @@
  */
 public class StreamletImplTest {
 
+  private Builder builder = Builder.newBuilder();
+
   @Test
-  public void testBasicParams() throws Exception {
-    Streamlet<Double> sample = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testBasicParams() {
+    Streamlet<Double> sample = builder.newSource(() -> Math.random());
     sample.setName("MyStreamlet");
     sample.setNumPartitions(20);
     assertEquals("MyStreamlet", sample.getName());
@@ -82,22 +84,22 @@
   }
 
   @Test
-  public void testSupplierStreamlet() throws Exception {
-    Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testSupplierStreamlet() {
+    Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
     assertTrue(streamlet instanceof SupplierStreamlet);
   }
 
   @Test
-  public void testSpoutStreamlet() throws Exception {
+  public void testSpoutStreamlet() {
     TestSpout spout = new TestSpout();
-    Streamlet<Double> streamlet = StreamletImpl.createSpoutStreamlet(spout);
+    Streamlet<Double> streamlet = builder.newSource(spout);
     assertTrue(streamlet instanceof SpoutStreamlet);
   }
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testMapStreamlet() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testMapStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).map((num) -> num * 10);
     assertTrue(streamlet instanceof MapStreamlet);
     MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet;
@@ -109,8 +111,8 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testFlatMapStreamlet() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testFlatMapStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .flatMap((num) -> Arrays.asList(num * 10));
     assertTrue(streamlet instanceof FlatMapStreamlet);
@@ -123,8 +125,8 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testFilterStreamlet() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testFilterStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).filter((num) -> num != 0);
     assertTrue(streamlet instanceof FilterStreamlet);
     FilterStreamlet<Double> mStreamlet = (FilterStreamlet<Double>) streamlet;
@@ -136,8 +138,8 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testRepartitionStreamlet() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testRepartitionStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).repartition(40);
     assertTrue(streamlet instanceof MapStreamlet);
     MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>) streamlet;
@@ -151,7 +153,7 @@
   @Test
   @SuppressWarnings("unchecked")
   public void testCloneStreamlet() {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     List<Streamlet<Double>> streamlets = baseStreamlet.setNumPartitions(20).clone(2);
     assertEquals(streamlets.size(), 2);
     assertTrue(streamlets.get(0) instanceof MapStreamlet);
@@ -164,9 +166,9 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testUnionStreamlet() throws Exception {
-    Streamlet<Double> baseStreamlet1 = StreamletImpl.createSupplierStreamlet(() -> Math.random());
-    Streamlet<Double> baseStreamlet2 = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testUnionStreamlet() {
+    Streamlet<Double> baseStreamlet1 = builder.newSource(() -> Math.random());
+    Streamlet<Double> baseStreamlet2 = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet1.union(baseStreamlet2);
     assertTrue(streamlet instanceof UnionStreamlet);
     SupplierStreamlet<Double> supplierStreamlet1 = (SupplierStreamlet<Double>) baseStreamlet1;
@@ -179,8 +181,8 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testTransformStreamlet() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testTransformStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet =
         baseStreamlet.transform(new SerializableTransformer<Double, Double>() {
           @Override
@@ -209,8 +211,8 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testCustomStreamletFromBolt() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testCustomStreamletFromBolt() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .applyOperator(new MyBoltOperator());
     assertTrue(streamlet instanceof CustomStreamlet);
@@ -227,8 +229,8 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testCustomStreamletFromBasicBolt() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testCustomStreamletFromBasicBolt() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .applyOperator(new MyBasicBoltOperator());
     assertTrue(streamlet instanceof CustomStreamlet);
@@ -246,8 +248,8 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testCustomStreamletFromWindowBolt() throws Exception {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+  public void testCustomStreamletFromWindowBolt() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .applyOperator(new MyWindowBoltOperator());
     assertTrue(streamlet instanceof CustomStreamlet);
@@ -261,16 +263,16 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testSimpleBuild() throws Exception {
-    Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> "sa re ga ma");
+  public void testSimpleBuild() {
+    Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
     baseStreamlet.flatMap(x -> Arrays.asList(x.split(" ")))
                  .reduceByKeyAndWindow(x -> x, x -> 1, WindowConfig.TumblingCountWindow(10),
                      (x, y) -> x + y);
     SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
     assertFalse(supplierStreamlet.isBuilt());
-    TopologyBuilder builder = new TopologyBuilder();
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
     Set<String> stageNames = new HashSet<>();
-    supplierStreamlet.build(builder, stageNames);
+    supplierStreamlet.build(topologyBuilder, stageNames);
     assertTrue(supplierStreamlet.allBuilt());
     assertEquals(supplierStreamlet.getChildren().size(), 1);
     assertTrue(supplierStreamlet.getChildren().get(0) instanceof FlatMapStreamlet);
@@ -286,14 +288,14 @@
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testComplexBuild() throws Exception {
+  public void testComplexBuild() {
     // First source
-    Streamlet<String> baseStreamlet1 = StreamletImpl.createSupplierStreamlet(() -> "sa re ga ma");
+    Streamlet<String> baseStreamlet1 = builder.newSource(() -> "sa re ga ma");
     Streamlet<String> leftStream =
         baseStreamlet1.flatMap(x -> Arrays.asList(x.split(" ")));
 
     // Second source
-    Streamlet<String> baseStreamlet2 = StreamletImpl.createSupplierStreamlet(() -> "I Love You");
+    Streamlet<String> baseStreamlet2 = builder.newSource(() -> "I Love You");
     Streamlet<String> rightStream =
         baseStreamlet2.flatMap(x -> Arrays.asList(x.split(" ")));
 
@@ -305,13 +307,13 @@
     SupplierStreamlet<String> supplierStreamlet2 = (SupplierStreamlet<String>) baseStreamlet2;
     assertFalse(supplierStreamlet1.isBuilt());
     assertFalse(supplierStreamlet2.isBuilt());
-    TopologyBuilder builder = new TopologyBuilder();
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
     Set<String> stageNames = new HashSet<>();
-    supplierStreamlet1.build(builder, stageNames);
+    supplierStreamlet1.build(topologyBuilder, stageNames);
     assertTrue(supplierStreamlet1.isBuilt());
     assertFalse(supplierStreamlet1.allBuilt());
 
-    supplierStreamlet2.build(builder, stageNames);
+    supplierStreamlet2.build(topologyBuilder, stageNames);
     assertTrue(supplierStreamlet1.allBuilt());
     assertTrue(supplierStreamlet2.allBuilt());
 
@@ -341,7 +343,7 @@
   @SuppressWarnings("unchecked")
   public void testCalculatedDefaultStageNames() {
     // create SupplierStreamlet
-    Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+    Streamlet<String> baseStreamlet = builder.newSource(() ->
         "This is test content");
     SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 0);
@@ -351,9 +353,9 @@
 
     // build SupplierStreamlet
     assertFalse(supplierStreamlet.isBuilt());
-    TopologyBuilder builder = new TopologyBuilder();
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
     Set<String> stageNames = new HashSet<>();
-    supplierStreamlet.build(builder, stageNames);
+    supplierStreamlet.build(topologyBuilder, stageNames);
 
     // verify SupplierStreamlet
     assertTrue(supplierStreamlet.allBuilt());
@@ -396,7 +398,7 @@
   @Test
   public void testDefaultStreamletNameIfNotSet() {
     // create SupplierStreamlet
-    Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+    Streamlet<String> baseStreamlet = builder.newSource(() ->
         "This is test content");
     SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
     Set<String> stageNames = new HashSet<>();
@@ -414,7 +416,7 @@
   public void testStreamletNameIfAlreadySet() {
     String supplierName = "MyStringSupplier";
     // create SupplierStreamlet
-    Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+    Streamlet<String> baseStreamlet = builder.newSource(() ->
         "This is test content");
     SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
     supplierStreamlet.setName(supplierName);
@@ -432,7 +434,7 @@
   @Test(expected = RuntimeException.class)
   public void testStreamletNameIfDuplicateNameIsSet() {
     // create SupplierStreamlet
-    Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(() ->
+    Streamlet<String> baseStreamlet = builder.newSource(() ->
         "This is test content");
 
     SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>) baseStreamlet;
@@ -444,14 +446,14 @@
 
     // build SupplierStreamlet
     assertFalse(supplierStreamlet.isBuilt());
-    TopologyBuilder builder = new TopologyBuilder();
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
     Set<String> stageNames = new HashSet<>();
-    supplierStreamlet.build(builder, stageNames);
+    supplierStreamlet.build(topologyBuilder, stageNames);
   }
 
   @Test
   public void testSetNameWithInvalidValues() {
-    Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
     Function<String, Streamlet<Double>> function = streamlet::setName;
     testByFunction(function, null);
     testByFunction(function, "");
@@ -460,14 +462,14 @@
 
   @Test(expected = IllegalArgumentException.class)
   public void testSetNumPartitionsWithInvalidValue() {
-    Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
     streamlet.setNumPartitions(0);
   }
 
   @Test(expected = IllegalArgumentException.class)
   @SuppressWarnings("unchecked")
   public void testCloneStreamletWithInvalidNumberOfClone() {
-    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
     baseStreamlet.setNumPartitions(20).clone(0);
   }