Add withStream() in Streamlet to support stream selection (#3109)

* Add withStream() in Streamlet to support stream selection

* Add split() function into Streamlet API
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 821fa51..e1f0616 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -20,6 +20,7 @@
 package org.apache.heron.streamlet;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.heron.api.grouping.StreamGrouping;
 import org.apache.heron.classification.InterfaceStability;
@@ -73,6 +74,22 @@
   int getNumPartitions();
 
   /**
+   * Set the id of the stream to be used by the children nodes.
+   * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+   *   source.withStream("stream1").filter(...).log();
+   *   source.withStream("stream2").filter(...).log();
+   * @param streamId The specified stream id
+   * @return Returns back the Streamlet with changed stream id
+   */
+  Streamlet<R> withStream(String streamId);
+
+  /**
+   * Gets the stream id of this Streamlet.
+   * @return the stream id of this Streamlet
+   */
+  String getStreamId();
+
+  /**
    * Return a new Streamlet by applying mapFn to each element of this Streamlet
    * @param mapFn The Map Function that should be applied to each element
   */
@@ -220,6 +237,13 @@
   <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator, StreamGrouping grouper);
 
   /**
+   * Returns multiple streams by splitting incoming stream.
+   * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+   * Note that there could be 0 or multiple target stream ids
+   */
+  Streamlet<R> split(Map<String, SerializablePredicate<R>> splitFns);
+
+  /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
    */
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index e76cd26..6b09de2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -19,14 +19,18 @@
 package org.apache.heron.streamlet.impl;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.heron.api.grouping.NoneStreamGrouping;
 import org.apache.heron.api.grouping.StreamGrouping;
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.utils.Utils;
 import org.apache.heron.streamlet.IStreamletOperator;
 import org.apache.heron.streamlet.JoinType;
 import org.apache.heron.streamlet.KeyValue;
@@ -51,6 +55,8 @@
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SplitStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.StreamletShadow;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
 
@@ -107,13 +113,14 @@
     CUSTOM_WINDOW("customWindow"),
     FILTER("filter"),
     FLATMAP("flatmap"),
-    REDUCE("reduceByKeyAndWindow"),
     JOIN("join"),
     LOGGER("logger"),
     MAP("map"),
+    SOURCE("generator"),
+    REDUCE("reduceByKeyAndWindow"),
     REMAP("remap"),
     SINK("sink"),
-    SOURCE("generator"),
+    SPLIT("split"),
     SPOUT("spout"),
     SUPPLIER("supplier"),
     TRANSFORM("transform"),
@@ -203,6 +210,53 @@
   }
 
   /**
+   * Set the id of the stream to be used by the children nodes.
+   * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+   *   source.withStream("stream1").filter(...).log();
+   *   source.withStream("stream2").filter(...).log();
+   * @param streamId The specified stream id
+   * @return Returns back the Streamlet with changed stream id
+   */
+  @SuppressWarnings("HiddenField")
+  @Override
+  public Streamlet<R> withStream(String streamId) {
+    checkNotBlank(streamId, "streamId can't be empty");
+
+    Set<String> availableIds = getAvailableStreamIds();
+    if (availableIds.contains(streamId)) {
+      StreamletShadow<R> shadow = new StreamletShadow<R>(this);
+      shadow.setStreamId(streamId);
+      return shadow;
+    } else {
+      throw new RuntimeException(
+          String.format("Stream id %s is not available in %s. Available ids are: %s.",
+                        streamId, getName(), availableIds.toString()));
+    }
+  }
+
+
+  /**
+   * Get the available stream ids in the Streamlet. For most Streamlets,
+   * there is only one internal stream id, therefore the function
+   * returns a set of one single stream id.
+   * @return Returns a set of one single stream id.
+   */
+  protected Set<String> getAvailableStreamIds() {
+    HashSet<String> ids = new HashSet<String>();
+    ids.add(getStreamId());
+    return ids;
+  }
+
+  /**
+   * Gets the stream id of this Streamlet.
+   * @return the stream id of this Streamlet`
+   */
+  @Override
+  public String getStreamId() {
+    return Utils.DEFAULT_STREAM_ID;
+  }
+
+  /**
    * Only used by the implementors
    */
   protected StreamletImpl() {
@@ -545,4 +599,21 @@
     addChild(customStreamlet);
     return customStreamlet;
   }
+
+  /**
+   * Returns multiple streams by splitting incoming stream.
+   * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+   * Note that there could be 0 or multiple target stream ids
+   */
+  @Override
+  public Streamlet<R> split(Map<String, SerializablePredicate<R>> splitFns) {
+    // Make sure map and stream ids are not empty
+    require(splitFns.size() > 0, "At least one entry is required");
+    require(splitFns.keySet().stream().allMatch(stream -> StringUtils.isNotBlank(stream)),
+            "Stream Id can not be blank");
+
+    SplitStreamlet<R> splitStreamlet = new SplitStreamlet<R>(this, splitFns);
+    addChild(splitStreamlet);
+    return splitStreamlet;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
new file mode 100644
index 0000000..56d0c64
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * StreamletShadow is a special kind of StreamletImpl object:
+ * - It is still an StreamletImpl therefore it supports all Streamlet functions like filter()
+ * and map(), and can be the parent object of other StreamletImpl objects. Therefore,
+ * from API point of view, it can be used in the same way as a normal StreamletImpl object.
+ * - However it is just an shadow object of a real StreamletImpl object and it doesn't
+ * represent a node in the topology DAG. Therefore it can not be a child of another StreamletImpl
+ * object. As the result, the shadow object is clonable, and it is fine to create multiple
+ * StreamletShadow objects pointing to the same StreamletImpl object and have different properties.
+ *
+ * A StreamletShadow object can be used to decorate the real Streamletimpl object. This is
+ * important for children StreamletImpl objects to consume output data from the same parent in
+ * different ways, such as selecting different stream.
+ *
+ * Usage:
+ * To create a shadow object that selecting "test" stream from an existing StreamletImpl
+ * object(stream):
+ *
+ * StreamletImpl shadow = new StreamletShadow(stream) {
+ *   @Override
+ *   public String getStreamId() {
+ *     return "test";
+ *   }
+ * }
+ *
+ */
+public class StreamletShadow<R> extends StreamletImpl<R> {
+  private StreamletImpl<R> real;
+  // Extra properties for a Streamlet object
+  protected String streamId;
+
+  public StreamletShadow(StreamletImpl<R> real) {
+    this.real = real;
+    this.streamId = real.getStreamId();
+  }
+
+  /**
+   * Sets the stream id of this Streamlet.
+   * @param streamId the stream id for this streamlet.
+   */
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+
+  /**
+   * Gets the stream id of this Streamlet.
+   * @return the stream id of this Streamlet
+   */
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  /*
+   * Functions accessible by child objects need to be overriden (forwarding the call to
+   * the real object since shadow object shouldn't have them)
+   */
+  @Override
+  public String getName() {
+    return real.getName();
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return real.getNumPartitions();
+  }
+
+  /*
+   * Functions related to topology building need to be overriden.
+   */
+  @Override
+  public <T> void addChild(StreamletImpl<T> child) {
+    real.addChild(child);
+  }
+
+  /**
+   * Gets all the children of this streamlet.
+   * Children of a streamlet are streamlets that are resulting from transformations of elements of
+   * this and potentially other streamlets.
+   * @return The kid streamlets
+   */
+  @Override
+  public List<StreamletImpl<?>> getChildren() {
+    return real.getChildren();
+  }
+
+  @Override
+  public void build(TopologyBuilder bldr, Set<String> stageNames) {
+    throw new UnsupportedOperationException("build() in StreamletShadow should NOT be invoked");
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    throw new UnsupportedOperationException("build() in StreamletShadow should NOT be invoked");
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java
new file mode 100644
index 0000000..c07893e
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import java.util.Map;
+
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.streamlet.SerializablePredicate;
+
+/**
+ * SplitOperator is the class that implements the split functionality.
+ * It takes in the split function as the input and use it to process tuples and
+ * get the output stream id and emit to the specific streams.
+ * Note that one tuple can be emitted to multiple or zero streams.
+ */
+public class SplitOperator<R> extends StreamletOperator<R, R> {
+  private Map<String, SerializablePredicate<R>> splitFns;
+
+  public SplitOperator(Map<String, SerializablePredicate<R>> splitFns) {
+    this.splitFns = splitFns;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    R obj = (R) tuple.getValue(0);
+    for (Map.Entry<String, SerializablePredicate<R>> entry: splitFns.entrySet()) {
+      if (entry.getValue().test(obj)) {
+        collector.emit(entry.getKey(), new Values(obj));
+      }
+    }
+    collector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    //super.declareOutputFields(declarer);
+    for (String stream: splitFns.keySet()) {
+      declarer.declareStream(stream, new Fields(OUTPUT_FIELD_NAME));
+    }
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 0b08478..0114a01 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -36,7 +36,7 @@
     extends BaseRichBolt
     implements IStreamletRichOperator<R, T> {
   private static final long serialVersionUID = 8524238140745238942L;
-  private static final String OUTPUT_FIELD_NAME = "output";
+  protected static final String OUTPUT_FIELD_NAME = "output";
 
   protected OutputCollector collector;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
index 98bb846..f71b71f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
@@ -52,7 +52,7 @@
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.CONSUMER, stageNames);
     bldr.setBolt(getName(), new ConsumerSink<>(consumer),
-        getNumPartitions()).shuffleGrouping(parent.getName());
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index d09b134..df3639d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -67,22 +67,22 @@
     if (operator instanceof IStreamletBasicOperator) {
       setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
       IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>) operator;
-      declarer = bldr.setBolt(getName(), op,  getNumPartitions());
+      bldr.setBolt(getName(), op,  getNumPartitions())
+          .grouping(parent.getName(), parent.getStreamId(), grouper);
     } else if (operator instanceof IStreamletRichOperator) {
       setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
       IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>) operator;
-      declarer = bldr.setBolt(getName(), op,  getNumPartitions());
+      bldr.setBolt(getName(), op,  getNumPartitions())
+          .grouping(parent.getName(), parent.getStreamId(), grouper);
     } else if (operator instanceof IStreamletWindowOperator) {
       setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
       IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>) operator;
-      declarer = bldr.setBolt(getName(), op,  getNumPartitions());
+      bldr.setBolt(getName(), op,  getNumPartitions())
+          .grouping(parent.getName(), parent.getStreamId(), grouper);
     } else {
       throw new RuntimeException("Unhandled operator class is found!");
     }
 
-    // Apply grouping
-    declarer.grouping(parent.getName(), grouper);
-
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java
index b314238..4f25669 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java
@@ -45,7 +45,7 @@
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.FILTER, stageNames);
     bldr.setBolt(getName(), new FilterOperator<R>(filterFn),
-        getNumPartitions()).shuffleGrouping(parent.getName());
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java
index 30a0500..95c057d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java
@@ -38,7 +38,7 @@
 
   public FlatMapStreamlet(StreamletImpl<R> parent,
                           SerializableFunction<? super R,
-                              ? extends Iterable<? extends T>> flatMapFn) {
+                                               ? extends Iterable<? extends T>> flatMapFn) {
     this.parent = parent;
     this.flatMapFn = flatMapFn;
     setNumPartitions(parent.getNumPartitions());
@@ -48,7 +48,7 @@
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.FLATMAP, stageNames);
     bldr.setBolt(getName(), new FlatMapOperator<R, T>(flatMapFn),
-        getNumPartitions()).shuffleGrouping(parent.getName());
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index 2bc6bf0..3f0abd8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -68,7 +68,7 @@
         new GeneralReduceByKeyAndWindowOperator<K, V, VR>(keyExtractor, identity, reduceFn);
     windowCfg.attachWindowConfig(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
-        .customGrouping(parent.getName(),
+        .customGrouping(parent.getName(), parent.getStreamId(),
             new ReduceByKeyAndWindowCustomGrouping<K, V>(keyExtractor));
     return true;
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index 0dcab51..89c6de9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -92,8 +92,10 @@
         right.getName(), leftKeyExtractor, rightKeyExtractor, joinFn);
     windowCfg.attachWindowConfig(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
-        .customGrouping(left.getName(), new JoinCustomGrouping<K, R>(leftKeyExtractor))
-        .customGrouping(right.getName(), new JoinCustomGrouping<K, S>(rightKeyExtractor));
+        .customGrouping(left.getName(), left.getStreamId(),
+            new JoinCustomGrouping<K, R>(leftKeyExtractor))
+        .customGrouping(right.getName(), right.getStreamId(),
+            new JoinCustomGrouping<K, S>(rightKeyExtractor));
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
index f29215b..9f68ff9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
@@ -48,8 +48,7 @@
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.LOGGER, stageNames);
     bldr.setBolt(getName(), new LogSink<R>(),
-        getNumPartitions()).shuffleGrouping(parent.getName());
-
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java
index 96c34b9..900c0e9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java
@@ -41,11 +41,15 @@
     setNumPartitions(parent.getNumPartitions());
   }
 
+  public StreamletImpl<R> getParent() {
+    return parent;
+  }
+
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.MAP, stageNames);
     bldr.setBolt(getName(), new MapOperator<R, T>(mapFn),
-        getNumPartitions()).shuffleGrouping(parent.getName());
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 33a5b75..7e7a7e1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -68,7 +68,7 @@
         valueExtractor, reduceFn);
     windowCfg.attachWindowConfig(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
-        .customGrouping(parent.getName(),
+        .customGrouping(parent.getName(), parent.getStreamId(),
             new ReduceByKeyAndWindowCustomGrouping<K, R>(keyExtractor));
     return true;
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java
index fed815f..b18980a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java
@@ -50,9 +50,9 @@
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.REMAP, stageNames);
-    bldr.setBolt(getName(), new MapOperator<R, R>((a) -> a),
-        getNumPartitions())
-        .customGrouping(parent.getName(), new RemapCustomGrouping<R>(remapFn));
+    bldr.setBolt(getName(), new MapOperator<R, R>((a) -> a), getNumPartitions())
+        .customGrouping(parent.getName(), parent.getStreamId(),
+            new RemapCustomGrouping<R>(remapFn));
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
index 9b24cf8..42e5d03 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
@@ -46,7 +46,7 @@
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.SINK, stageNames);
     bldr.setBolt(getName(), new ComplexSink<>(sink),
-        getNumPartitions()).shuffleGrouping(parent.getName());
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SplitStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SplitStreamlet.java
new file mode 100644
index 0000000..8128353
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SplitStreamlet.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.SerializablePredicate;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.operators.SplitOperator;
+
+/**
+ * SplitStreamlet represents a Streamlet that splits an incoming
+ * stream into multiple streams using a split function. Each tuple
+ * can be emitted into no or multiple streams.
+ */
+public class SplitStreamlet<R> extends StreamletImpl<R> {
+  private StreamletImpl<R> parent;
+  private Map<String, SerializablePredicate<R>> splitFns;
+
+  public SplitStreamlet(StreamletImpl<R> parent,
+                        Map<String, SerializablePredicate<R>> splitFns) {
+
+    this.parent = parent;
+    this.splitFns = splitFns;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  /**
+   * Get the available stream ids in the Streamlet.
+   * @return Returns the set of available stream ids
+   */
+  @Override
+  protected Set<String> getAvailableStreamIds() {
+    return splitFns.keySet();
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.SPLIT, stageNames);
+    bldr.setBolt(getName(), new SplitOperator<R>(splitFns),
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java
index fffd4e0..1f206c7 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java
@@ -48,7 +48,7 @@
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.TRANSFORM, stageNames);
     bldr.setBolt(getName(), new TransformOperator<R, T>(serializableTransformer),
-        getNumPartitions()).shuffleGrouping(parent.getName());
+        getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java
index 8c6e9a6..a1ecbcf 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java
@@ -49,7 +49,9 @@
     }
     setDefaultNameIfNone(StreamletNamePrefix.UNION, stageNames);
     bldr.setBolt(getName(), new UnionOperator<I>(),
-        getNumPartitions()).shuffleGrouping(left.getName()).shuffleGrouping(right.getName());
+        getNumPartitions())
+          .shuffleGrouping(left.getName(), left.getStreamId())
+          .shuffleGrouping(right.getName(), right.getStreamId());
     return true;
   }
 }
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 8200316..9945852 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -79,6 +79,22 @@
   def getNumPartitions: Int
 
   /**
+   * Set the id of the stream to be used by the children nodes.
+   * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+   *   source.withStream("stream1").filter(...).log();
+   *   source.withStream("stream2").filter(...).log();
+   * @param streamId The specified stream id
+   * @return Returns back the Streamlet with changed stream id
+   */
+  def withStream(streamId: String): Streamlet[R]
+
+  /**
+   * Gets the stream id of this Streamlet.
+   * @return the stream id of this Streamlet
+   */
+  def getStreamId: String
+
+  /**
     * Return a new Streamlet by applying mapFn to each element of this Streamlet
     *
     * @param mapFn The Map Function that should be applied to each element
@@ -224,7 +240,7 @@
   def transform[T](
       serializableTransformer: SerializableTransformer[R, _ <: T]): Streamlet[T]
 
-/**
+  /**
    * Returns a new Streamlet by applying the operator on each element of this streamlet.
    * @param operator The operator to be applied
    * @param <T> The return type of the transform
@@ -242,6 +258,13 @@
   def applyOperator[T](operator: IStreamletOperator[R, T], grouper: StreamGrouping): Streamlet[T]
 
   /**
+   * Returns multiple streams by splitting incoming stream.
+   * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+   * Note that there could be 0 or multiple target stream ids
+   */
+  def split(splitFns: Map[String, R => Boolean]): Streamlet[R]
+
+  /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
     */
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 50c655e..fc308b5 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -18,6 +18,9 @@
  */
 package org.apache.heron.streamlet.scala.impl
 
+import java.util.{Map => JMap}
+import java.util.{HashMap => JHashMap}
+
 import scala.collection.JavaConverters
 
 import org.apache.heron.api.grouping.StreamGrouping
@@ -26,6 +29,7 @@
   JoinType,
   KeyValue,
   KeyedWindow,
+  SerializablePredicate,
   Streamlet => JavaStreamlet,
   WindowConfig
 }
@@ -91,6 +95,23 @@
   override def getNumPartitions(): Int = javaStreamlet.getNumPartitions
 
   /**
+   * Set the id of the stream to be used by the children nodes.
+   * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+   *   source.withStream("stream1").filter(...).log();
+   *   source.withStream("stream2").filter(...).log();
+   * @param streamId The specified stream id
+   * @return Returns back the Streamlet with changed stream id
+   */
+  override def withStream(streamId: String): Streamlet[R] =
+    fromJavaStreamlet[R](javaStreamlet.withStream(streamId))
+
+  /**
+   * Gets the stream id of this Streamlet.
+   * @return the stream id of this Streamlet
+   */
+  override def getStreamId(): String = javaStreamlet.getStreamId
+
+  /**
     * Return a new Streamlet by applying mapFn to each element of this Streamlet
     *
     * @param mapFn The Map Function that should be applied to each element
@@ -344,6 +365,20 @@
     fromJavaStreamlet(newJavaStreamlet)
   }
 
+  /*
+   * Returns multiple streams by splitting incoming stream.
+   * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+   * Note that there could be 0 or multiple target stream ids
+   */
+  override def split(splitFns: Map[String, R => Boolean]): Streamlet[R] = {
+    val javaSerializablePredicates: JMap[String, SerializablePredicate[R]] = new JHashMap()
+    splitFns.foreach { case (key, func) =>
+      javaSerializablePredicates.put(key, toSerializablePredicate[R](func))
+    }
+    val newJavaStreamlet = javaStreamlet.split(javaSerializablePredicates)
+    fromJavaStreamlet[R](newJavaStreamlet)
+  }
+
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 33f3a9c..5d9053a 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -20,8 +20,10 @@
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -30,6 +32,7 @@
 
 import org.apache.heron.api.grouping.ShuffleStreamGrouping;
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.utils.Utils;
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.resource.TestBasicBolt;
 import org.apache.heron.resource.TestBolt;
@@ -42,6 +45,7 @@
 import org.apache.heron.streamlet.IStreamletRichOperator;
 import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.SerializableConsumer;
+import org.apache.heron.streamlet.SerializablePredicate;
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
@@ -88,6 +92,68 @@
   }
 
   @Test
+  public void testSplitAndWithStream() {
+    Map<String, SerializablePredicate<Double>> splitter = new HashMap();
+    splitter.put("all", i -> true);
+    splitter.put("positive", i -> i > 0);
+    splitter.put("negative", i -> i < 0);
+
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    // The streamlet should have three output streams after split()
+    Streamlet<Double> multiStreams = baseStreamlet.split(splitter);
+
+    // Default stream is used
+    Streamlet<Double> positiveStream = multiStreams.withStream("positive");
+    Streamlet<Double> negativeStream = multiStreams.withStream("negative");
+
+    Streamlet<Double> allMap = multiStreams.withStream("all").map((num) -> num * 10);
+    Streamlet<Double> positiveMap = positiveStream.map((num) -> num * 10);
+    Streamlet<Double> negativeMap = negativeStream.map((num) -> num * 10);
+
+    // Original streamlet should still have the default strean id eventhough the id
+    // is not available. Other shadow streamlets should have the correct stream ids.
+    assertEquals(multiStreams.getStreamId(), Utils.DEFAULT_STREAM_ID);
+    assertEquals(positiveStream.getStreamId(), "positive");
+    assertEquals(negativeStream.getStreamId(), "negative");
+
+    StreamletImpl<Double> impl = (StreamletImpl<Double>) multiStreams;
+    assertEquals(impl.getChildren().size(), 3);
+
+    // Children streamlets should have the right parent stream id
+    assertEquals(((MapStreamlet<Double, Double>) allMap).getParent().getStreamId(),
+        "all");
+    assertEquals(((MapStreamlet<Double, Double>) positiveMap).getParent().getStreamId(),
+        "positive");
+    assertEquals(((MapStreamlet<Double, Double>) negativeMap).getParent().getStreamId(),
+        "negative");
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testSplitAndWithWrongStream() {
+    Map<String, SerializablePredicate<Double>> splitter = new HashMap();
+    splitter.put("all", i -> true);
+    splitter.put("positive", i -> i > 0);
+    splitter.put("negative", i -> i < 0);
+
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    // The streamlet should have three output streams after split()
+    Streamlet<Double> multiStreams = baseStreamlet.split(splitter);
+
+    // Select a good stream id and a bad stream id
+    Streamlet<Double> goodStream = multiStreams.withStream("positive");
+    Streamlet<Double> badStream = multiStreams.withStream("wrong-id");
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testWithWrongStream() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    // Normal Streamlet objects, including sources, have only the default stream id.
+    // Selecting any other stream using withStream() should trigger a runtime
+    // exception
+    Streamlet<Double> badStream = baseStreamlet.withStream("wrong-id");
+  }
+
+  @Test
   public void testSupplierStreamlet() {
     Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
     assertTrue(streamlet instanceof SupplierStreamlet);
@@ -492,6 +558,12 @@
   }
 
   @Test(expected = IllegalArgumentException.class)
+  public void testWithStreamWithInvalidValue() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    baseStreamlet.withStream("");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
   @SuppressWarnings("unchecked")
   public void testCloneStreamletWithInvalidNumberOfClone() {
     Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
new file mode 100644
index 0000000..4cc70d2
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.ArrayList;
+
+import org.junit.Test;
+import org.mockito.Mock;
+
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit tests for {@link StreamletShadow}
+ */
+public class StreamletShadowTest {
+  @Mock
+  private StreamletImpl<Double> mockReal;
+
+  @Mock
+  private StreamletImpl<Double> mockChild;
+
+  @Test
+  public void testConstruction() {
+    doReturn("real_name").when(mockReal).getName();
+    doReturn(1).when(mockReal).getNumPartitions();
+    doNothing().when(mockReal).addChild(mockChild);
+    doReturn(new ArrayList<StreamletImpl<Double>>()).when(mockReal).getChildren();
+    doReturn("real_stream").when(mockReal).getStreamId();
+
+    StreamletShadow<Double> shadow = new StreamletShadow(mockReal);
+    assertEquals(shadow.getName(), "real_name");
+    assertEquals(shadow.getNumPartitions(), 1);
+    assertEquals(shadow.getStreamId(), "real_stream");
+
+    // set a different stream id
+    shadow.setStreamId("shadow_stream");
+    assertEquals(shadow.getStreamId(), "shadow_stream");
+
+    // addChild call should be forwarded to the real object
+    verify(mockReal, never()).addChild(mockChild);
+    shadow.addChild(mockChild);
+    verify(mockReal, times(1)).addChild(mockChild);
+  }
+
+}
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index b843732..5664d64 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -45,6 +45,7 @@
   RemapStreamlet,
   TransformStreamlet,
   SinkStreamlet,
+  SplitStreamlet,
   UnionStreamlet
 }
 
@@ -245,6 +246,62 @@
     verifySupplierStreamlet(supplierStreamlet2)
   }
 
+  test("StreamletImpl should support split and withStream transformation") {
+    val supplierStreamlet = builder
+      .newSource(() => Math.random)
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(20)
+
+    val splitted = supplierStreamlet
+      .split(Map(
+        "positive" -> { num: Double => num > 0 },
+        "negative" -> { num: Double => num < 0 }
+      ))
+      .setName("Split_Streamlet_1")
+      .setNumPartitions(5)
+
+    splitted.withStream("positive")
+      .map { num: Double =>
+        num * 10
+      }
+
+    splitted.withStream("negative")
+      .map { num: Double =>
+        num * -10
+      }
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[SplitStreamlet[_]])
+
+    val splitStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[SplitStreamlet[Double]]
+    assertEquals("Split_Streamlet_1", splitStreamlet.getName)
+    assertEquals(2, splitStreamlet.getChildren.size())
+    assertTrue(
+      splitStreamlet
+        .getChildren.get(0)
+        .isInstanceOf[MapStreamlet[_, _]])
+    assertTrue(
+      splitStreamlet
+        .getChildren.get(1)
+        .isInstanceOf[MapStreamlet[_, _]])
+    
+    val mapStreamlet1 = splitStreamlet
+      .getChildren.get(0)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("positive", mapStreamlet1.getParent.getStreamId)
+    val mapStreamlet2 = splitStreamlet
+      .getChildren.get(1)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("negative", mapStreamlet2.getParent.getStreamId)
+  }
+
   test("StreamletImpl should support consume function") {
     val supplierStreamlet = builder
       .newSource(() => Math.random)
diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStream.java b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStream.java
new file mode 100644
index 0000000..0e787ff
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.streamlet_with_split_and_with_stream;
+
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.integration_test.common.AbstractTestTopology;
+import org.apache.heron.integration_test.core.TestTopologyBuilder;
+import org.apache.heron.streamlet.Builder;
+import org.apache.heron.streamlet.SerializablePredicate;
+import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.impl.BuilderImpl;
+
+/**
+  * Streamlet Integration Test
+  */
+class StreamletWithSplitAndWithStream extends AbstractTestTopology {
+  private static AtomicInteger atomicInteger = new AtomicInteger(-3);
+
+  StreamletWithSplitAndWithStream(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TestTopologyBuilder buildTopology(TestTopologyBuilder testTopologyBuilder) {
+    Map<String, SerializablePredicate<Integer>> splitter = new HashMap();
+    splitter.put("all", i -> true);
+    splitter.put("positive", i -> i > 0);
+    splitter.put("negative", i -> i < 0);
+
+    Builder streamletBuilder = Builder.newBuilder();
+    Streamlet<Integer> multi = streamletBuilder
+        .newSource(() -> atomicInteger.getAndIncrement())
+        .setName("incremented-numbers-from--3")
+        .filter(i -> i <= 4)
+        .setName("numbers-lower-than-4")
+        .split(splitter)
+        .setName("split");
+
+    multi.withStream("all").map((Integer i) -> String.format("all_%d", i));
+    multi.withStream("positive").map((Integer i) -> String.format("pos_%d", i));
+    multi.withStream("negative").map((Integer i) -> String.format("neg_%d", i));
+
+    BuilderImpl streamletBuilderImpl = (BuilderImpl) streamletBuilder;
+    TestTopologyBuilder topology =
+        (TestTopologyBuilder) streamletBuilderImpl.build(testTopologyBuilder);
+
+    return topology;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Config conf = new Config();
+    StreamletWithSplitAndWithStream topology = new StreamletWithSplitAndWithStream(args);
+    topology.submit(conf);
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json
new file mode 100644
index 0000000..a401e62
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json
@@ -0,0 +1 @@
+["all_-1", "all_-2", "all_-3", "all_0", "all_1", "all_2", "all_3", "all_4", "neg_-1", "neg_-2", "neg_-3", "pos_1", "pos_2", "pos_3", "pos_4"]
\ No newline at end of file
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 762b2f6..62cc92f 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -17,6 +17,11 @@
       "topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
       "classPath"    : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone.ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
       "expectedResultRelativePath" : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json"
+    },
+    {
+      "topologyName": "IntegrationTest_ScalaStreamletWithSplitAndWithStream",
+      "classPath": "scala_streamlet_with_split_and_with_stream.ScalaStreamletWithSplitAndWithStream",
+      "expectedResultRelativePath": "scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json"
     }
   ],
   "javaTopologies": [
@@ -138,6 +143,11 @@
       "topologyName": "IntegrationTest_StreamletWithMapAndFlatMapAndFilterAndClone",
       "classPath": "streamlet_with_map_and_flatmap_and_filter_and_clone.StreamletWithMapAndFlatMapAndFilterAndClone",
       "expectedResultRelativePath": "streamlet_with_map_and_flatmap_and_filter_and_clone/StreamletWithMapAndFlatMapAndFilterAndCloneResults.json"
+    },
+    {
+      "topologyName": "IntegrationTest_StreamletWithSplitAndWithStream",
+      "classPath": "streamlet_with_split_and_with_stream.StreamletWithSplitAndWithStream",
+      "expectedResultRelativePath": "streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json"
     }
   ],
   "pythonTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStream.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStream.scala
new file mode 100644
index 0000000..307a037
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStream.scala
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.scala_streamlet_with_split_and_with_stream
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.Context
+
+import org.apache.heron.integration_test.common.{
+  AbstractTestTopology,
+  ScalaIntegrationTestBase
+}
+import org.apache.heron.streamlet.scala.{
+  Builder, SerializableTransformer
+}
+
+object ScalaStreamletWithSplitAndWithStream {
+  def main(args: Array[String]): Unit = {
+    val conf = new Config
+    val topology = new ScalaStreamletWithSplitAndWithStream(args)
+    topology.submit(conf)
+  }
+}
+
+/**
+  * Scala Streamlet Integration Test
+  */
+class ScalaStreamletWithSplitAndWithStream(args: Array[String])
+    extends AbstractTestTopology(args)
+    with ScalaIntegrationTestBase {
+
+  override protected def buildTopology(
+      testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+    val atomicInteger = new AtomicInteger(-3)
+
+    val streamletBuilder = Builder.newBuilder
+
+    val multi = streamletBuilder
+        .newSource(() => atomicInteger.getAndIncrement())
+        .setName("incremented-numbers-from--3")
+        .filter(i => i <= 4)
+        .setName("numbers-lower-than-4")
+        .split(Map(
+            "all" -> { num => true },
+            "positive" -> { num => num > 0 },
+            "negative" -> { num => num < 0 }
+        ))
+        .setName("split")
+
+    multi.withStream("all").map { i => s"all_$i" }
+    multi.withStream("positive").map { i => s"pos_$i" }
+    multi.withStream("negative").map{ i => s"neg_$i" }
+
+    build(testTopologyBuilder, streamletBuilder)
+  }
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json
new file mode 100644
index 0000000..4df41be
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json
@@ -0,0 +1,17 @@
+[
+  "all_-1",
+  "all_-2",
+  "all_-3",
+  "all_0",
+  "all_1",
+  "all_2",
+  "all_3",
+  "all_4",
+  "neg_-1",
+  "neg_-2",
+  "neg_-3",
+  "pos_1",
+  "pos_2",
+  "pos_3",
+  "pos_4"
+]
\ No newline at end of file