Make log/sink/consume Streamlet component support setName and setNumPartitions (#3459)

diff --git a/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java b/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java
index 843da57..03488f8 100644
--- a/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java
+++ b/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java
@@ -144,13 +144,15 @@
      * Elements in the first cloned streamlet go to the database sink.
      */
     splitGameScoreStreamlet.get(0)
-        .toSink(new DatabaseSink());
+        .toSink(new DatabaseSink())
+        .setName("sink0");
 
     /**
      * Elements in the second cloned streamlet go to the logging sink.
      */
     splitGameScoreStreamlet.get(1)
-        .toSink(new FormattedLogSink());
+        .toSink(new FormattedLogSink())
+        .setName("sink1");
 
     Config config = Config.defaultConfig();
 
diff --git a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala
index f155bcc..7bc20ca 100644
--- a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala
+++ b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala
@@ -92,6 +92,7 @@
       )
       .setName("joined-classical-musics-by-year")
       .log()
+      .setName("log")
 
     val config = Config.defaultConfig()
 
diff --git a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala
index b0663bc..6f618df 100644
--- a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala
+++ b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala
@@ -58,6 +58,8 @@
       .union(zeroes)
       .setName("union-of-numbers")
       .log()
+      .setName("log")
+      .setNumPartitions(1)
 
     val config = Config.newBuilder
       .setNumContainers(NUM_CONTAINERS)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 1a108a4..1c38760 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -45,7 +45,7 @@
  * Streamlet before doing the transformation.
  */
 @InterfaceStability.Evolving
-public interface Streamlet<R> {
+public interface Streamlet<R> extends StreamletBase<R> {
 
   /**
    * Sets the name of the BaseStreamlet.
@@ -299,7 +299,7 @@
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
    */
-  void log();
+  StreamletBase<R> log();
 
   /**
    * Applies the consumer function to every element of the stream
@@ -307,7 +307,7 @@
    * @param consumer The user supplied consumer function that is invoked for each element
    * of this streamlet.
    */
-  void consume(SerializableConsumer<R> consumer);
+  StreamletBase<R> consume(SerializableConsumer<R> consumer);
 
   /**
    * Applies the sink's put function to every element of the stream
@@ -315,5 +315,5 @@
    * @param sink The Sink whose put method consumes each element
    * of this streamlet.
    */
-  void toSink(Sink<R> sink);
+  StreamletBase<R> toSink(Sink<R> sink);
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java b/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java
new file mode 100644
index 0000000..f52e7f2
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+/**
+ * A Streamlet is a (potentially unbounded) ordered collection of tuples.
+ * The StreamletBase class contains basic information of a Streamlet
+ * such as name and partition count without the connection functions
+ * such as map() and filter().
+ */
+public interface StreamletBase<R> {
+
+  /**
+   * Sets the name of the BaseStreamlet.
+   * @param sName The name given by the user for this BaseStreamlet
+   * @return Returns back the Streamlet with changed name
+   */
+  StreamletBase<R> setName(String sName);
+
+  /**
+   * Gets the name of the Streamlet.
+   * @return Returns the name of the Streamlet
+   */
+  String getName();
+
+  /**
+   * Sets the number of partitions of the streamlet
+   * @param numPartitions The user assigned number of partitions
+   * @return Returns back the Streamlet with changed number of partitions
+   */
+  StreamletBase<R> setNumPartitions(int numPartitions);
+
+  /**
+   * Gets the number of partitions of this Streamlet.
+   * @return the number of partitions of this Streamlet
+   */
+  int getNumPartitions();
+
+  // This is the main interface that every Streamlet implementation should implement
+  // The main tasks are generally to make sure that appropriate names/partitions are
+  // computed and add a spout/bolt to the TopologyBuilder
+  // void build(TopologyBuilder bldr, Set<String> stageNames);
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 853bb97..2dca00e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -91,7 +91,7 @@
       streamlet.build(builder, stageNames);
     }
     for (StreamletImpl<?> streamlet : sources) {
-      if (!streamlet.allBuilt()) {
+      if (!streamlet.isFullyBuilt()) {
         throw new RuntimeException("Topology cannot be fully built! Are all sources added?");
       }
     }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java
new file mode 100644
index 0000000..6b3530c
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.StreamletBase;
+
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
+
+/**
+ * A Streamlet is a (potentially unbounded) ordered collection of tuples.
+ * Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from
+ * static data(such as csv files, HDFS files), or for that matter any other
+ * source. They are also created by transforming existing Streamlets using
+ * operations such as map/flatMap, etc.
+ * Besides the tuples, a Streamlet has the following properties associated with it
+ * a) name. User assigned or system generated name to refer the streamlet
+ * b) nPartitions. Number of partitions that the streamlet is composed of. Thus the
+ *    ordering of the tuples in a Streamlet is wrt the tuples within a partition.
+ *    This allows the system to distribute  each partition to different nodes across the cluster.
+ * A bunch of transformations can be done on Streamlets(like map/flatMap, etc.). Each
+ * of these transformations operate on every tuple of the Streamlet and produce a new
+ * Streamlet. One can think of a transformation attaching itself to the stream and processing
+ * each tuple as they go by. Thus the parallelism of any operator is implicitly determined
+ * by the number of partitions of the stream that it is operating on. If a particular
+ * transformation wants to operate at a different parallelism, one can repartition the
+ * Streamlet before doing the transformation.
+ */
+public abstract class StreamletBaseImpl<R> implements StreamletBase<R> {
+  private static final Logger LOG = Logger.getLogger(StreamletBaseImpl.class.getName());
+  protected String name;
+  protected int nPartitions;
+  private List<StreamletBaseImpl<?>> children;
+  private boolean built;
+
+  /**
+   * Only used by the implementors
+   */
+  protected StreamletBaseImpl() {
+    this.name = null;
+    this.nPartitions = -1;
+    this.children = new LinkedList<>();
+    this.built = false;
+  }
+
+  protected enum StreamletNamePrefix {
+    CONSUMER("consumer"),
+    COUNT("count"),
+    CUSTOM("custom"),
+    CUSTOM_BASIC("customBasic"),
+    CUSTOM_WINDOW("customWindow"),
+    FILTER("filter"),
+    FLATMAP("flatmap"),
+    JOIN("join"),
+    KEYBY("keyBy"),
+    LOGGER("logger"),
+    MAP("map"),
+    SOURCE("generator"),
+    REDUCE("reduce"),
+    REMAP("remap"),
+    SINK("sink"),
+    SPLIT("split"),
+    SPOUT("spout"),
+    SUPPLIER("supplier"),
+    TRANSFORM("transform"),
+    UNION("union");
+
+    private final String prefix;
+
+    StreamletNamePrefix(final String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public String toString() {
+      return prefix;
+    }
+  }
+
+  /**
+   * Sets the name of the Streamlet.
+   * @param sName The name given by the user for this streamlet
+   * @return Returns back the Streamlet with changed name
+   */
+  @Override
+  public StreamletBase<R> setName(String sName) {
+    checkNotBlank(sName, "Streamlet name cannot be null/blank");
+
+    this.name = sName;
+    return this;
+  }
+
+  /**
+   * Gets the name of the Streamlet.
+   * @return Returns the name of the Streamlet
+   */
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
+    int index = 1;
+    String calculatedName;
+    while (true) {
+      calculatedName = new StringBuilder(prefix.toString()).append(index).toString();
+      if (!stageNames.contains(calculatedName)) {
+        break;
+      }
+      index++;
+    }
+    LOG.info("Calculated stage Name as " + calculatedName);
+    return calculatedName;
+  }
+
+  /**
+   * Sets a default unique name to the Streamlet by type if it is not set.
+   * Otherwise, just checks its uniqueness.
+   * @param prefix The name prefix of this streamlet
+   * @param stageNames The collections of created streamlet/stage names
+   */
+  protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
+    if (getName() == null) {
+      setName(defaultNameCalculator(prefix, stageNames));
+    }
+    if (stageNames.contains(getName())) {
+      throw new RuntimeException(String.format(
+          "The stage name %s is used multiple times in the same topology", getName()));
+    }
+    stageNames.add(getName());
+  }
+
+  /**
+   * Sets the number of partitions of the streamlet
+   * @param numPartitions The user assigned number of partitions
+   * @return Returns back the Streamlet with changed number of partitions
+   */
+  @Override
+  public StreamletBase<R> setNumPartitions(int numPartitions) {
+    require(numPartitions > 0, "Streamlet's partitions number should be > 0");
+
+    this.nPartitions = numPartitions;
+    return this;
+  }
+
+  /**
+   * Gets the number of partitions of this Streamlet.
+   * @return the number of partitions of this Streamlet
+   */
+  @Override
+  public int getNumPartitions() {
+    return nPartitions;
+  }
+
+  public <T> void addChild(StreamletBaseImpl<T> child) {
+    children.add(child);
+  }
+
+  /**
+   * Gets all the children of this streamlet.
+   * Children of a streamlet are streamlets that are resulting from transformations of elements of
+   * this and potentially other streamlets.
+   * @return The kid streamlets
+   */
+  public List<StreamletBaseImpl<?>> getChildren() {
+    return children;
+  }
+
+  public void build(TopologyBuilder bldr, Set<String> stageNames) {
+    if (built) {
+      throw new RuntimeException("Logic Error While building " + getName());
+    }
+
+    if (doBuild(bldr, stageNames)) {
+      built = true;
+      for (StreamletBaseImpl<?> streamlet : getChildren()) {
+        streamlet.build(bldr, stageNames);
+      }
+    }
+  }
+
+  public boolean isBuilt() {
+    return built;
+  }
+
+  public boolean isFullyBuilt() {
+    if (!isBuilt()) {
+      return false;
+    }
+    for (StreamletBaseImpl<?> child : children) {
+      if (!child.isFullyBuilt()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // This is the main interface that every Streamlet implementation should implement
+  // The main tasks are generally to make sure that appropriate names/partitions are
+  // computed and add a spout/bolt to the TopologyBuilder
+  protected abstract boolean doBuild(TopologyBuilder bldr, Set<String> stageNames);
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 81a7d78..65d7534 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -20,7 +20,6 @@
 
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,7 +28,6 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.heron.api.grouping.NoneStreamGrouping;
 import org.apache.heron.api.grouping.StreamGrouping;
-import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.api.utils.Utils;
 import org.apache.heron.streamlet.IStreamletOperator;
 import org.apache.heron.streamlet.JoinType;
@@ -43,6 +41,7 @@
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Sink;
 import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.StreamletBase;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet;
@@ -89,72 +88,9 @@
  * transformation wants to operate at a different parallelism, one can repartition the
  * Streamlet before doing the transformation.
  */
-public abstract class StreamletImpl<R> implements Streamlet<R> {
+public abstract class StreamletImpl<R>
+    extends StreamletBaseImpl<R> implements Streamlet<R> {
   private static final Logger LOG = Logger.getLogger(StreamletImpl.class.getName());
-  protected String name;
-  protected int nPartitions;
-  private List<StreamletImpl<?>> children;
-  private boolean built;
-
-  public boolean isBuilt() {
-    return built;
-  }
-
-  public boolean allBuilt() {
-    if (!built) {
-      return false;
-    }
-    for (StreamletImpl<?> child : children) {
-      if (!child.allBuilt()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  protected enum StreamletNamePrefix {
-    CONSUMER("consumer"),
-    COUNT("count"),
-    CUSTOM("custom"),
-    CUSTOM_BASIC("customBasic"),
-    CUSTOM_WINDOW("customWindow"),
-    FILTER("filter"),
-    FLATMAP("flatmap"),
-    JOIN("join"),
-    KEYBY("keyBy"),
-    LOGGER("logger"),
-    MAP("map"),
-    SOURCE("generator"),
-    REDUCE("reduce"),
-    REMAP("remap"),
-    SINK("sink"),
-    SPLIT("split"),
-    SPOUT("spout"),
-    SUPPLIER("supplier"),
-    TRANSFORM("transform"),
-    UNION("union");
-
-    private final String prefix;
-
-    StreamletNamePrefix(final String prefix) {
-      this.prefix = prefix;
-    }
-
-    @Override
-    public String toString() {
-      return prefix;
-    }
-  }
-
-  /**
-   * Gets all the children of this streamlet.
-   * Children of a streamlet are streamlets that are resulting from transformations of elements of
-   * this and potentially other streamlets.
-   * @return The kid streamlets
-   */
-  public List<StreamletImpl<?>> getChildren() {
-    return children;
-  }
 
   /**
    * Sets the name of the Streamlet.
@@ -163,61 +99,22 @@
    */
   @Override
   public Streamlet<R> setName(String sName) {
-    checkNotBlank(sName, "Streamlet name cannot be null/blank");
-
-    this.name = sName;
+    super.setName(sName);
     return this;
   }
 
   /**
-   * Gets the name of the Streamlet.
-   * @return Returns the name of the Streamlet
-   */
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  /**
-   * Sets a default unique name to the Streamlet by type if it is not set.
-   * Otherwise, just checks its uniqueness.
-   * @param prefix The name prefix of this streamlet
-   * @param stageNames The collections of created streamlet/stage names
-   */
-  protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
-    if (getName() == null) {
-      setName(defaultNameCalculator(prefix, stageNames));
-    }
-    if (stageNames.contains(getName())) {
-      throw new RuntimeException(String.format(
-          "The stage name %s is used multiple times in the same topology", getName()));
-    }
-    stageNames.add(getName());
-  }
-
-  /**
    * Sets the number of partitions of the streamlet
    * @param numPartitions The user assigned number of partitions
    * @return Returns back the Streamlet with changed number of partitions
    */
   @Override
   public Streamlet<R> setNumPartitions(int numPartitions) {
-    require(numPartitions > 0, "Streamlet's partitions number should be > 0");
-
-    this.nPartitions = numPartitions;
+    super.setNumPartitions(numPartitions);
     return this;
   }
 
   /**
-   * Gets the number of partitions of this Streamlet.
-   * @return the number of partitions of this Streamlet
-   */
-  @Override
-  public int getNumPartitions() {
-    return nPartitions;
-  }
-
-  /**
    * Set the id of the stream to be used by the children nodes.
    * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
    *   source.withStream("stream1").filter(...).log();
@@ -271,45 +168,7 @@
    * Only used by the implementors
    */
   protected StreamletImpl() {
-    this.nPartitions = -1;
-    this.children = new LinkedList<>();
-    this.built = false;
-  }
-
-  public void build(TopologyBuilder bldr, Set<String> stageNames) {
-    if (built) {
-      throw new RuntimeException("Logic Error While building " + getName());
-    }
-
-    if (doBuild(bldr, stageNames)) {
-      built = true;
-      for (StreamletImpl<?> streamlet : children) {
-        streamlet.build(bldr, stageNames);
-      }
-    }
-  }
-
-  // This is the main interface that every Streamlet implementation should implement
-  // The main tasks are generally to make sure that appropriate names/partitions are
-  // computed and add a spout/bolt to the TopologyBuilder
-  protected abstract boolean doBuild(TopologyBuilder bldr, Set<String> stageNames);
-
-  public <T> void addChild(StreamletImpl<T> child) {
-    children.add(child);
-  }
-
-  private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
-    int index = 1;
-    String calculatedName;
-    while (true) {
-      calculatedName = new StringBuilder(prefix.toString()).append(index).toString();
-      if (!stageNames.contains(calculatedName)) {
-        break;
-      }
-      index++;
-    }
-    LOG.info("Calculated stage Name as " + calculatedName);
-    return calculatedName;
+    super();
   }
 
   /**
@@ -571,9 +430,10 @@
    * that does not contain any tuple. Thus this function returns void.
    */
   @Override
-  public void log() {
+  public StreamletBase<R> log() {
     LogStreamlet<R> logger = new LogStreamlet<>(this);
     addChild(logger);
+    return logger;
   }
 
   /**
@@ -581,11 +441,12 @@
    * @param consumer The user supplied consumer function that is invoked for each element
    */
   @Override
-  public void consume(SerializableConsumer<R> consumer) {
+  public StreamletBase<R> consume(SerializableConsumer<R> consumer) {
     checkNotNull(consumer, "consumer cannot be null");
 
     ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<>(this, consumer);
     addChild(consumerStreamlet);
+    return consumerStreamlet;
   }
 
   /**
@@ -593,11 +454,12 @@
    * @param sink The Sink that consumes
    */
   @Override
-  public void toSink(Sink<R> sink) {
+  public StreamletBase<R> toSink(Sink<R> sink) {
     checkNotNull(sink, "sink cannot be null");
 
     SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<>(this, sink);
     addChild(sinkStreamlet);
+    return sinkStreamlet;
   }
 
   /**
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
index 633c4c6..eb5d3be 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
@@ -23,6 +23,7 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 
 /**
@@ -100,8 +101,7 @@
   /*
    * Functions related to topology building need to be overriden.
    */
-  @Override
-  public <T> void addChild(StreamletImpl<T> child) {
+  public <T> void addChild(StreamletBaseImpl<T> child) {
     real.addChild(child);
   }
 
@@ -112,7 +112,7 @@
    * @return The kid streamlets
    */
   @Override
-  public List<StreamletImpl<?>> getChildren() {
+  public List<StreamletBaseImpl<?>> getChildren() {
     return real.getChildren();
   }
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
index f71b71f..92c5928 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
@@ -24,6 +24,7 @@
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.SerializableConsumer;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.sinks.ConsumerSink;
 
@@ -32,7 +33,7 @@
  * streamlet after consuming every element. Since elements of the parents are just consumed
  * by the user passed consumer function, nothing is emitted, thus this streamlet is empty.
  */
-public class ConsumerStreamlet<R> extends StreamletImpl<R> {
+public class ConsumerStreamlet<R> extends StreamletBaseImpl<R> {
   private StreamletImpl<R> parent;
   private SerializableConsumer<R> consumer;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
index 9f68ff9..db9cefa 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
@@ -22,6 +22,7 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.sinks.LogSink;
 
@@ -30,7 +31,7 @@
  * streamlet after logging each element. Since elements of the parents are just logged
  * nothing is emitted, thus this streamlet is empty.
  */
-public class LogStreamlet<R> extends StreamletImpl<R> {
+public class LogStreamlet<R> extends StreamletBaseImpl<R> {
   private StreamletImpl<R> parent;
 
   public LogStreamlet(StreamletImpl<R> parent) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
index 42e5d03..9625b6d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
@@ -24,6 +24,7 @@
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.Sink;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.sinks.ComplexSink;
 
@@ -32,7 +33,7 @@
  * streamlet after consuming every element. Since elements of the parents are just consumed
  * by the user passed consumer function, nothing is emitted, thus this streamlet is empty.
  */
-public class SinkStreamlet<R> extends StreamletImpl<R> {
+public class SinkStreamlet<R> extends StreamletBaseImpl<R> {
   private StreamletImpl<R> parent;
   private Sink<R> sink;
 
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 8151941..073b74a 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -46,7 +46,7 @@
   * transformation wants to operate at a different parallelism, one can repartition the
   * Streamlet before doing the transformation.
   */
-trait Streamlet[R] {
+trait Streamlet[R] extends StreamletBase[R] {
 
   /**
     * Sets the name of the Streamlet.
@@ -57,13 +57,6 @@
   def setName(sName: String): Streamlet[R]
 
   /**
-    * Gets the name of the Streamlet.
-    *
-    * @return Returns the name of the Streamlet
-    */
-  def getName: String
-
-  /**
     * Sets the number of partitions of the streamlet
     *
     * @param numPartitions The user assigned number of partitions
@@ -72,13 +65,6 @@
   def setNumPartitions(numPartitions: Int): Streamlet[R]
 
   /**
-    * Gets the number of partitions of this Streamlet.
-    *
-    * @return the number of partitions of this Streamlet
-    */
-  def getNumPartitions: Int
-
-  /**
    * Set the id of the stream to be used by the children nodes.
    * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
    *   source.withStream("stream1").filter(...).log();
@@ -320,7 +306,7 @@
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
     */
-  def log(): Unit
+  def log(): StreamletBase[R]
 
   /**
     * Applies the consumer function to every element of the stream
@@ -329,7 +315,7 @@
     * @param consumer The user supplied consumer function that is invoked for each element
     *                 of this streamlet.
     */
-  def consume(consumer: R => Unit): Unit
+  def consume(consumer: R => Unit): StreamletBase[R]
 
   /**
     * Applies the sink's put function to every element of the stream
@@ -338,6 +324,6 @@
     * @param sink The Sink whose put method consumes each element
     *             of this streamlet.
     */
-  def toSink(sink: Sink[R]): Unit
+  def toSink(sink: Sink[R]): StreamletBase[R]
 
 }
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala
new file mode 100644
index 0000000..64468d1
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala
+
+/**
+ * A Streamlet is a (potentially unbounded) ordered collection of tuples.
+ * The StreamletBase class contains basic information of a Streamlet
+ * such as name and partition count without the connection functions
+ * such as map() and filter().
+ */
+trait StreamletBase[R] {
+
+  /**
+    * Sets the name of the Streamlet.
+    *
+    * @param sName The name given by the user for this Streamlet
+    * @return Returns back the Streamlet with changed name
+    */
+  def setName(sName: String): StreamletBase[R]
+
+  /**
+    * Gets the name of the Streamlet.
+    *
+    * @return Returns the name of the Streamlet
+    */
+  def getName: String
+
+  /**
+    * Sets the number of partitions of the streamlet
+    *
+    * @param numPartitions The user assigned number of partitions
+    * @return Returns back the Streamlet with changed number of partitions
+    */
+  def setNumPartitions(numPartitions: Int): StreamletBase[R]
+
+  /**
+    * Gets the number of partitions of this Streamlet.
+    *
+    * @return the number of partitions of this Streamlet
+    */
+  def getNumPartitions: Int
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala
new file mode 100644
index 0000000..d5f336e
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala.impl
+
+import org.apache.heron.streamlet.{StreamletBase => JavaStreamletBase}
+import org.apache.heron.streamlet.scala.StreamletBase
+
+object StreamletBaseImpl {
+  def fromJavaStreamletBase[R](javaStreamletBase: JavaStreamletBase[R]): StreamletBase[R] =
+    new StreamletBaseImpl[R](javaStreamletBase)
+
+  def toJavaStreamletBase[R](streamlet: StreamletBase[R]): JavaStreamletBase[R] =
+    streamlet.asInstanceOf[StreamletBaseImpl[R]].javaStreamletBase
+}
+
+/**
+ * This class provides Scala Streamlet Implementation by wrapping Java Streamlet API.
+ * Passed User defined Scala Functions are transformed to related FunctionalInterface versions and
+ * related Java Streamlet is transformed to Scala version again.
+ */
+class StreamletBaseImpl[R](val javaStreamletBase: JavaStreamletBase[R]) extends StreamletBase[R] {
+    import StreamletBaseImpl._
+
+    /**
+    * Sets the name of the Streamlet.
+    *
+    * @param sName The name given by the user for this Streamlet
+    * @return Returns back the Streamlet with changed name
+    */
+  override def setName(sName: String): StreamletBase[R] =
+    fromJavaStreamletBase[R](javaStreamletBase.setName(sName))
+
+  /**
+    * Gets the name of the Streamlet.
+    *
+    * @return Returns the name of the Streamlet
+    */
+  override def getName(): String = javaStreamletBase.getName
+
+  /**
+    * Sets the number of partitions of the streamlet
+    *
+    * @param numPartitions The user assigned number of partitions
+    * @return Returns back the Streamlet with changed number of partitions
+    */
+  override def setNumPartitions(numPartitions: Int): StreamletBase[R] =
+    fromJavaStreamletBase[R](javaStreamletBase.setNumPartitions(numPartitions))
+
+  /**
+    * Gets the number of partitions of this Streamlet.
+    *
+    * @return the number of partitions of this Streamlet
+    */
+  override def getNumPartitions(): Int = javaStreamletBase.getNumPartitions
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index fe6b7df..bee1298 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -32,9 +32,11 @@
   SerializablePredicate,
   KVStreamlet => JavaKVStreamlet,
   Streamlet => JavaStreamlet,
+  StreamletBase => JavaStreamletBase,
   WindowConfig
 }
 import org.apache.heron.streamlet.impl.{
+  StreamletBaseImpl => JavaStreamletBaseImpl,
   StreamletImpl => JavaStreamletImpl
 }
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet
@@ -43,7 +45,8 @@
   SerializableTransformer,
   Sink,
   KVStreamlet,
-  Streamlet
+  Streamlet,
+  StreamletBase
 }
 import org.apache.heron.streamlet.scala.converter.ScalaToJavaConverter._
 
@@ -62,7 +65,7 @@
  * related Java Streamlet is transformed to Scala version again.
  */
 class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
-    extends Streamlet[R] {
+    extends StreamletBaseImpl[R](javaStreamlet) with Streamlet[R] {
 
   import StreamletImpl._
 
@@ -76,13 +79,6 @@
     fromJavaStreamlet[R](javaStreamlet.setName(sName))
 
   /**
-    * Gets the name of the Streamlet.
-    *
-    * @return Returns the name of the Streamlet
-    */
-  override def getName(): String = javaStreamlet.getName
-
-  /**
     * Sets the number of partitions of the streamlet
     *
     * @param numPartitions The user assigned number of partitions
@@ -92,13 +88,6 @@
     fromJavaStreamlet[R](javaStreamlet.setNumPartitions(numPartitions))
 
   /**
-    * Gets the number of partitions of this Streamlet.
-    *
-    * @return the number of partitions of this Streamlet
-    */
-  override def getNumPartitions(): Int = javaStreamlet.getNumPartitions
-
-  /**
    * Set the id of the stream to be used by the children nodes.
    * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
    *   source.withStream("stream1").filter(...).log();
@@ -480,7 +469,10 @@
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
     */
-  override def log(): Unit = javaStreamlet.log()
+  override def log(): StreamletBase[R] = {
+    val newJavaStreamletBase = javaStreamlet.log()
+    StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase)
+  }
 
   /**
     * Applies the consumer function to every element of the stream
@@ -489,9 +481,10 @@
     * @param consumer The user supplied consumer function that is invoked for each element
     *                 of this streamlet.
     */
-  override def consume(consumer: R => Unit): Unit = {
+  override def consume(consumer: R => Unit): StreamletBase[R] = {
     val serializableConsumer = toSerializableConsumer[R](consumer)
-    javaStreamlet.consume(serializableConsumer)
+    val newJavaStreamletBase = javaStreamlet.consume(serializableConsumer)
+    StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase)
   }
 
   /**
@@ -501,9 +494,10 @@
     * @param sink The Sink whose put method consumes each element
     *             of this streamlet.
     */
-  override def toSink(sink: Sink[R]): Unit = {
+  override def toSink(sink: Sink[R]): StreamletBase[R] = {
     val javaSink = toJavaSink[R](sink)
-    javaStreamlet.toSink(javaSink)
+    val newJavaStreamletBase = javaStreamlet.toSink(javaSink)
+    StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase)
   }
 
   /**
@@ -513,11 +507,11 @@
     *
     * @return The kid streamlets
     */
-  private[impl] def getChildren: List[JavaStreamletImpl[_]] = {
+  private[impl] def getChildren: List[JavaStreamletBaseImpl[_]] = {
     import _root_.scala.collection.JavaConversions._
     val children =
       javaStreamlet
-        .asInstanceOf[JavaStreamletImpl[_]]
+        .asInstanceOf[JavaStreamletBaseImpl[_]]
         .getChildren
     children.toList
   }
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index ea40f01..4392417 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -461,7 +461,7 @@
     TopologyBuilder topologyBuilder = new TopologyBuilder();
     Set<String> stageNames = new HashSet<>();
     supplierStreamlet.build(topologyBuilder, stageNames);
-    assertTrue(supplierStreamlet.allBuilt());
+    assertTrue(supplierStreamlet.isFullyBuilt());
     assertEquals(supplierStreamlet.getChildren().size(), 1);
     assertTrue(supplierStreamlet.getChildren().get(0) instanceof FlatMapStreamlet);
     FlatMapStreamlet<String, String> fStreamlet =
@@ -499,11 +499,11 @@
     Set<String> stageNames = new HashSet<>();
     supplierStreamlet1.build(topologyBuilder, stageNames);
     assertTrue(supplierStreamlet1.isBuilt());
-    assertFalse(supplierStreamlet1.allBuilt());
+    assertFalse(supplierStreamlet1.isFullyBuilt());
 
     supplierStreamlet2.build(topologyBuilder, stageNames);
-    assertTrue(supplierStreamlet1.allBuilt());
-    assertTrue(supplierStreamlet2.allBuilt());
+    assertTrue(supplierStreamlet1.isFullyBuilt());
+    assertTrue(supplierStreamlet2.isFullyBuilt());
 
     // go over all stuff
     assertEquals(supplierStreamlet1.getChildren().size(), 1);
@@ -546,7 +546,7 @@
     supplierStreamlet.build(topologyBuilder, stageNames);
 
     // verify SupplierStreamlet
-    assertTrue(supplierStreamlet.allBuilt());
+    assertTrue(supplierStreamlet.isFullyBuilt());
     assertEquals(1, supplierStreamlet.getChildren().size());
     assertTrue(supplierStreamlet.getChildren().get(0) instanceof ConsumerStreamlet);
     assertEquals("consumer1", supplierStreamlet.getChildren().get(0).getName());