Add withStream() in Streamlet to support stream selection (#3109)
* Add withStream() in Streamlet to support stream selection
* Add split() function into Streamlet API
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 821fa51..e1f0616 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -20,6 +20,7 @@
package org.apache.heron.streamlet;
import java.util.List;
+import java.util.Map;
import org.apache.heron.api.grouping.StreamGrouping;
import org.apache.heron.classification.InterfaceStability;
@@ -73,6 +74,22 @@
int getNumPartitions();
/**
+ * Set the id of the stream to be used by the children nodes.
+ * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+ * source.withStream("stream1").filter(...).log();
+ * source.withStream("stream2").filter(...).log();
+ * @param streamId The specified stream id
+ * @return Returns back the Streamlet with changed stream id
+ */
+ Streamlet<R> withStream(String streamId);
+
+ /**
+ * Gets the stream id of this Streamlet.
+ * @return the stream id of this Streamlet
+ */
+ String getStreamId();
+
+ /**
* Return a new Streamlet by applying mapFn to each element of this Streamlet
* @param mapFn The Map Function that should be applied to each element
*/
@@ -220,6 +237,13 @@
<T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator, StreamGrouping grouper);
/**
+ * Returns multiple streams by splitting incoming stream.
+ * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+ * Note that there could be 0 or multiple target stream ids
+ */
+ Streamlet<R> split(Map<String, SerializablePredicate<R>> splitFns);
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index e76cd26..6b09de2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -19,14 +19,18 @@
package org.apache.heron.streamlet.impl;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
+import org.apache.commons.lang3.StringUtils;
import org.apache.heron.api.grouping.NoneStreamGrouping;
import org.apache.heron.api.grouping.StreamGrouping;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.utils.Utils;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
import org.apache.heron.streamlet.KeyValue;
@@ -51,6 +55,8 @@
import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SplitStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.StreamletShadow;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -107,13 +113,14 @@
CUSTOM_WINDOW("customWindow"),
FILTER("filter"),
FLATMAP("flatmap"),
- REDUCE("reduceByKeyAndWindow"),
JOIN("join"),
LOGGER("logger"),
MAP("map"),
+ SOURCE("generator"),
+ REDUCE("reduceByKeyAndWindow"),
REMAP("remap"),
SINK("sink"),
- SOURCE("generator"),
+ SPLIT("split"),
SPOUT("spout"),
SUPPLIER("supplier"),
TRANSFORM("transform"),
@@ -203,6 +210,53 @@
}
/**
+ * Set the id of the stream to be used by the children nodes.
+ * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+ * source.withStream("stream1").filter(...).log();
+ * source.withStream("stream2").filter(...).log();
+ * @param streamId The specified stream id
+ * @return Returns back the Streamlet with changed stream id
+ */
+ @SuppressWarnings("HiddenField")
+ @Override
+ public Streamlet<R> withStream(String streamId) {
+ checkNotBlank(streamId, "streamId can't be empty");
+
+ Set<String> availableIds = getAvailableStreamIds();
+ if (availableIds.contains(streamId)) {
+ StreamletShadow<R> shadow = new StreamletShadow<R>(this);
+ shadow.setStreamId(streamId);
+ return shadow;
+ } else {
+ throw new RuntimeException(
+ String.format("Stream id %s is not available in %s. Available ids are: %s.",
+ streamId, getName(), availableIds.toString()));
+ }
+ }
+
+
+ /**
+ * Get the available stream ids in the Streamlet. For most Streamlets,
+ * there is only one internal stream id, therefore the function
+ * returns a set of one single stream id.
+ * @return Returns a set of one single stream id.
+ */
+ protected Set<String> getAvailableStreamIds() {
+ HashSet<String> ids = new HashSet<String>();
+ ids.add(getStreamId());
+ return ids;
+ }
+
+ /**
+ * Gets the stream id of this Streamlet.
+ * @return the stream id of this Streamlet`
+ */
+ @Override
+ public String getStreamId() {
+ return Utils.DEFAULT_STREAM_ID;
+ }
+
+ /**
* Only used by the implementors
*/
protected StreamletImpl() {
@@ -545,4 +599,21 @@
addChild(customStreamlet);
return customStreamlet;
}
+
+ /**
+ * Returns multiple streams by splitting incoming stream.
+ * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+ * Note that there could be 0 or multiple target stream ids
+ */
+ @Override
+ public Streamlet<R> split(Map<String, SerializablePredicate<R>> splitFns) {
+ // Make sure map and stream ids are not empty
+ require(splitFns.size() > 0, "At least one entry is required");
+ require(splitFns.keySet().stream().allMatch(stream -> StringUtils.isNotBlank(stream)),
+ "Stream Id can not be blank");
+
+ SplitStreamlet<R> splitStreamlet = new SplitStreamlet<R>(this, splitFns);
+ addChild(splitStreamlet);
+ return splitStreamlet;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
new file mode 100644
index 0000000..56d0c64
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * StreamletShadow is a special kind of StreamletImpl object:
+ * - It is still an StreamletImpl therefore it supports all Streamlet functions like filter()
+ * and map(), and can be the parent object of other StreamletImpl objects. Therefore,
+ * from API point of view, it can be used in the same way as a normal StreamletImpl object.
+ * - However it is just an shadow object of a real StreamletImpl object and it doesn't
+ * represent a node in the topology DAG. Therefore it can not be a child of another StreamletImpl
+ * object. As the result, the shadow object is clonable, and it is fine to create multiple
+ * StreamletShadow objects pointing to the same StreamletImpl object and have different properties.
+ *
+ * A StreamletShadow object can be used to decorate the real Streamletimpl object. This is
+ * important for children StreamletImpl objects to consume output data from the same parent in
+ * different ways, such as selecting different stream.
+ *
+ * Usage:
+ * To create a shadow object that selecting "test" stream from an existing StreamletImpl
+ * object(stream):
+ *
+ * StreamletImpl shadow = new StreamletShadow(stream) {
+ * @Override
+ * public String getStreamId() {
+ * return "test";
+ * }
+ * }
+ *
+ */
+public class StreamletShadow<R> extends StreamletImpl<R> {
+ private StreamletImpl<R> real;
+ // Extra properties for a Streamlet object
+ protected String streamId;
+
+ public StreamletShadow(StreamletImpl<R> real) {
+ this.real = real;
+ this.streamId = real.getStreamId();
+ }
+
+ /**
+ * Sets the stream id of this Streamlet.
+ * @param streamId the stream id for this streamlet.
+ */
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ /**
+ * Gets the stream id of this Streamlet.
+ * @return the stream id of this Streamlet
+ */
+ @Override
+ public String getStreamId() {
+ return streamId;
+ }
+
+ /*
+ * Functions accessible by child objects need to be overriden (forwarding the call to
+ * the real object since shadow object shouldn't have them)
+ */
+ @Override
+ public String getName() {
+ return real.getName();
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return real.getNumPartitions();
+ }
+
+ /*
+ * Functions related to topology building need to be overriden.
+ */
+ @Override
+ public <T> void addChild(StreamletImpl<T> child) {
+ real.addChild(child);
+ }
+
+ /**
+ * Gets all the children of this streamlet.
+ * Children of a streamlet are streamlets that are resulting from transformations of elements of
+ * this and potentially other streamlets.
+ * @return The kid streamlets
+ */
+ @Override
+ public List<StreamletImpl<?>> getChildren() {
+ return real.getChildren();
+ }
+
+ @Override
+ public void build(TopologyBuilder bldr, Set<String> stageNames) {
+ throw new UnsupportedOperationException("build() in StreamletShadow should NOT be invoked");
+ }
+
+ @Override
+ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ throw new UnsupportedOperationException("build() in StreamletShadow should NOT be invoked");
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java
new file mode 100644
index 0000000..c07893e
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/SplitOperator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import java.util.Map;
+
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.streamlet.SerializablePredicate;
+
+/**
+ * SplitOperator is the class that implements the split functionality.
+ * It takes in the split function as the input and use it to process tuples and
+ * get the output stream id and emit to the specific streams.
+ * Note that one tuple can be emitted to multiple or zero streams.
+ */
+public class SplitOperator<R> extends StreamletOperator<R, R> {
+ private Map<String, SerializablePredicate<R>> splitFns;
+
+ public SplitOperator(Map<String, SerializablePredicate<R>> splitFns) {
+ this.splitFns = splitFns;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute(Tuple tuple) {
+ R obj = (R) tuple.getValue(0);
+ for (Map.Entry<String, SerializablePredicate<R>> entry: splitFns.entrySet()) {
+ if (entry.getValue().test(obj)) {
+ collector.emit(entry.getKey(), new Values(obj));
+ }
+ }
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ //super.declareOutputFields(declarer);
+ for (String stream: splitFns.keySet()) {
+ declarer.declareStream(stream, new Fields(OUTPUT_FIELD_NAME));
+ }
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 0b08478..0114a01 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -36,7 +36,7 @@
extends BaseRichBolt
implements IStreamletRichOperator<R, T> {
private static final long serialVersionUID = 8524238140745238942L;
- private static final String OUTPUT_FIELD_NAME = "output";
+ protected static final String OUTPUT_FIELD_NAME = "output";
protected OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
index 98bb846..f71b71f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
@@ -52,7 +52,7 @@
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.CONSUMER, stageNames);
bldr.setBolt(getName(), new ConsumerSink<>(consumer),
- getNumPartitions()).shuffleGrouping(parent.getName());
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index d09b134..df3639d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -67,22 +67,22 @@
if (operator instanceof IStreamletBasicOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>) operator;
- declarer = bldr.setBolt(getName(), op, getNumPartitions());
+ bldr.setBolt(getName(), op, getNumPartitions())
+ .grouping(parent.getName(), parent.getStreamId(), grouper);
} else if (operator instanceof IStreamletRichOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>) operator;
- declarer = bldr.setBolt(getName(), op, getNumPartitions());
+ bldr.setBolt(getName(), op, getNumPartitions())
+ .grouping(parent.getName(), parent.getStreamId(), grouper);
} else if (operator instanceof IStreamletWindowOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>) operator;
- declarer = bldr.setBolt(getName(), op, getNumPartitions());
+ bldr.setBolt(getName(), op, getNumPartitions())
+ .grouping(parent.getName(), parent.getStreamId(), grouper);
} else {
throw new RuntimeException("Unhandled operator class is found!");
}
- // Apply grouping
- declarer.grouping(parent.getName(), grouper);
-
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java
index b314238..4f25669 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FilterStreamlet.java
@@ -45,7 +45,7 @@
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.FILTER, stageNames);
bldr.setBolt(getName(), new FilterOperator<R>(filterFn),
- getNumPartitions()).shuffleGrouping(parent.getName());
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java
index 30a0500..95c057d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/FlatMapStreamlet.java
@@ -38,7 +38,7 @@
public FlatMapStreamlet(StreamletImpl<R> parent,
SerializableFunction<? super R,
- ? extends Iterable<? extends T>> flatMapFn) {
+ ? extends Iterable<? extends T>> flatMapFn) {
this.parent = parent;
this.flatMapFn = flatMapFn;
setNumPartitions(parent.getNumPartitions());
@@ -48,7 +48,7 @@
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.FLATMAP, stageNames);
bldr.setBolt(getName(), new FlatMapOperator<R, T>(flatMapFn),
- getNumPartitions()).shuffleGrouping(parent.getName());
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index 2bc6bf0..3f0abd8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -68,7 +68,7 @@
new GeneralReduceByKeyAndWindowOperator<K, V, VR>(keyExtractor, identity, reduceFn);
windowCfg.attachWindowConfig(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
- .customGrouping(parent.getName(),
+ .customGrouping(parent.getName(), parent.getStreamId(),
new ReduceByKeyAndWindowCustomGrouping<K, V>(keyExtractor));
return true;
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index 0dcab51..89c6de9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -92,8 +92,10 @@
right.getName(), leftKeyExtractor, rightKeyExtractor, joinFn);
windowCfg.attachWindowConfig(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
- .customGrouping(left.getName(), new JoinCustomGrouping<K, R>(leftKeyExtractor))
- .customGrouping(right.getName(), new JoinCustomGrouping<K, S>(rightKeyExtractor));
+ .customGrouping(left.getName(), left.getStreamId(),
+ new JoinCustomGrouping<K, R>(leftKeyExtractor))
+ .customGrouping(right.getName(), right.getStreamId(),
+ new JoinCustomGrouping<K, S>(rightKeyExtractor));
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
index f29215b..9f68ff9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
@@ -48,8 +48,7 @@
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.LOGGER, stageNames);
bldr.setBolt(getName(), new LogSink<R>(),
- getNumPartitions()).shuffleGrouping(parent.getName());
-
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java
index 96c34b9..900c0e9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java
@@ -41,11 +41,15 @@
setNumPartitions(parent.getNumPartitions());
}
+ public StreamletImpl<R> getParent() {
+ return parent;
+ }
+
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.MAP, stageNames);
bldr.setBolt(getName(), new MapOperator<R, T>(mapFn),
- getNumPartitions()).shuffleGrouping(parent.getName());
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 33a5b75..7e7a7e1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -68,7 +68,7 @@
valueExtractor, reduceFn);
windowCfg.attachWindowConfig(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
- .customGrouping(parent.getName(),
+ .customGrouping(parent.getName(), parent.getStreamId(),
new ReduceByKeyAndWindowCustomGrouping<K, R>(keyExtractor));
return true;
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java
index fed815f..b18980a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/RemapStreamlet.java
@@ -50,9 +50,9 @@
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.REMAP, stageNames);
- bldr.setBolt(getName(), new MapOperator<R, R>((a) -> a),
- getNumPartitions())
- .customGrouping(parent.getName(), new RemapCustomGrouping<R>(remapFn));
+ bldr.setBolt(getName(), new MapOperator<R, R>((a) -> a), getNumPartitions())
+ .customGrouping(parent.getName(), parent.getStreamId(),
+ new RemapCustomGrouping<R>(remapFn));
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
index 9b24cf8..42e5d03 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
@@ -46,7 +46,7 @@
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.SINK, stageNames);
bldr.setBolt(getName(), new ComplexSink<>(sink),
- getNumPartitions()).shuffleGrouping(parent.getName());
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SplitStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SplitStreamlet.java
new file mode 100644
index 0000000..8128353
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SplitStreamlet.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.SerializablePredicate;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.operators.SplitOperator;
+
+/**
+ * SplitStreamlet represents a Streamlet that splits an incoming
+ * stream into multiple streams using a split function. Each tuple
+ * can be emitted into no or multiple streams.
+ */
+public class SplitStreamlet<R> extends StreamletImpl<R> {
+ private StreamletImpl<R> parent;
+ private Map<String, SerializablePredicate<R>> splitFns;
+
+ public SplitStreamlet(StreamletImpl<R> parent,
+ Map<String, SerializablePredicate<R>> splitFns) {
+
+ this.parent = parent;
+ this.splitFns = splitFns;
+ setNumPartitions(parent.getNumPartitions());
+ }
+
+ /**
+ * Get the available stream ids in the Streamlet.
+ * @return Returns the set of available stream ids
+ */
+ @Override
+ protected Set<String> getAvailableStreamIds() {
+ return splitFns.keySet();
+ }
+
+ @Override
+ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ setDefaultNameIfNone(StreamletNamePrefix.SPLIT, stageNames);
+ bldr.setBolt(getName(), new SplitOperator<R>(splitFns),
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
+ return true;
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java
index fffd4e0..1f206c7 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/TransformStreamlet.java
@@ -48,7 +48,7 @@
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.TRANSFORM, stageNames);
bldr.setBolt(getName(), new TransformOperator<R, T>(serializableTransformer),
- getNumPartitions()).shuffleGrouping(parent.getName());
+ getNumPartitions()).shuffleGrouping(parent.getName(), parent.getStreamId());
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java
index 8c6e9a6..a1ecbcf 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/UnionStreamlet.java
@@ -49,7 +49,9 @@
}
setDefaultNameIfNone(StreamletNamePrefix.UNION, stageNames);
bldr.setBolt(getName(), new UnionOperator<I>(),
- getNumPartitions()).shuffleGrouping(left.getName()).shuffleGrouping(right.getName());
+ getNumPartitions())
+ .shuffleGrouping(left.getName(), left.getStreamId())
+ .shuffleGrouping(right.getName(), right.getStreamId());
return true;
}
}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 8200316..9945852 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -79,6 +79,22 @@
def getNumPartitions: Int
/**
+ * Set the id of the stream to be used by the children nodes.
+ * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+ * source.withStream("stream1").filter(...).log();
+ * source.withStream("stream2").filter(...).log();
+ * @param streamId The specified stream id
+ * @return Returns back the Streamlet with changed stream id
+ */
+ def withStream(streamId: String): Streamlet[R]
+
+ /**
+ * Gets the stream id of this Streamlet.
+ * @return the stream id of this Streamlet
+ */
+ def getStreamId: String
+
+ /**
* Return a new Streamlet by applying mapFn to each element of this Streamlet
*
* @param mapFn The Map Function that should be applied to each element
@@ -224,7 +240,7 @@
def transform[T](
serializableTransformer: SerializableTransformer[R, _ <: T]): Streamlet[T]
-/**
+ /**
* Returns a new Streamlet by applying the operator on each element of this streamlet.
* @param operator The operator to be applied
* @param <T> The return type of the transform
@@ -242,6 +258,13 @@
def applyOperator[T](operator: IStreamletOperator[R, T], grouper: StreamGrouping): Streamlet[T]
/**
+ * Returns multiple streams by splitting incoming stream.
+ * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+ * Note that there could be 0 or multiple target stream ids
+ */
+ def split(splitFns: Map[String, R => Boolean]): Streamlet[R]
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 50c655e..fc308b5 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -18,6 +18,9 @@
*/
package org.apache.heron.streamlet.scala.impl
+import java.util.{Map => JMap}
+import java.util.{HashMap => JHashMap}
+
import scala.collection.JavaConverters
import org.apache.heron.api.grouping.StreamGrouping
@@ -26,6 +29,7 @@
JoinType,
KeyValue,
KeyedWindow,
+ SerializablePredicate,
Streamlet => JavaStreamlet,
WindowConfig
}
@@ -91,6 +95,23 @@
override def getNumPartitions(): Int = javaStreamlet.getNumPartitions
/**
+ * Set the id of the stream to be used by the children nodes.
+ * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
+ * source.withStream("stream1").filter(...).log();
+ * source.withStream("stream2").filter(...).log();
+ * @param streamId The specified stream id
+ * @return Returns back the Streamlet with changed stream id
+ */
+ override def withStream(streamId: String): Streamlet[R] =
+ fromJavaStreamlet[R](javaStreamlet.withStream(streamId))
+
+ /**
+ * Gets the stream id of this Streamlet.
+ * @return the stream id of this Streamlet
+ */
+ override def getStreamId(): String = javaStreamlet.getStreamId
+
+ /**
* Return a new Streamlet by applying mapFn to each element of this Streamlet
*
* @param mapFn The Map Function that should be applied to each element
@@ -344,6 +365,20 @@
fromJavaStreamlet(newJavaStreamlet)
}
+ /*
+ * Returns multiple streams by splitting incoming stream.
+ * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
+ * Note that there could be 0 or multiple target stream ids
+ */
+ override def split(splitFns: Map[String, R => Boolean]): Streamlet[R] = {
+ val javaSerializablePredicates: JMap[String, SerializablePredicate[R]] = new JHashMap()
+ splitFns.foreach { case (key, func) =>
+ javaSerializablePredicates.put(key, toSerializablePredicate[R](func))
+ }
+ val newJavaStreamlet = javaStreamlet.split(javaSerializablePredicates)
+ fromJavaStreamlet[R](newJavaStreamlet)
+ }
+
/**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 33f3a9c..5d9053a 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -20,8 +20,10 @@
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -30,6 +32,7 @@
import org.apache.heron.api.grouping.ShuffleStreamGrouping;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.resource.TestBasicBolt;
import org.apache.heron.resource.TestBolt;
@@ -42,6 +45,7 @@
import org.apache.heron.streamlet.IStreamletRichOperator;
import org.apache.heron.streamlet.IStreamletWindowOperator;
import org.apache.heron.streamlet.SerializableConsumer;
+import org.apache.heron.streamlet.SerializablePredicate;
import org.apache.heron.streamlet.SerializableTransformer;
import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
@@ -88,6 +92,68 @@
}
@Test
+ public void testSplitAndWithStream() {
+ Map<String, SerializablePredicate<Double>> splitter = new HashMap();
+ splitter.put("all", i -> true);
+ splitter.put("positive", i -> i > 0);
+ splitter.put("negative", i -> i < 0);
+
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+ // The streamlet should have three output streams after split()
+ Streamlet<Double> multiStreams = baseStreamlet.split(splitter);
+
+ // Default stream is used
+ Streamlet<Double> positiveStream = multiStreams.withStream("positive");
+ Streamlet<Double> negativeStream = multiStreams.withStream("negative");
+
+ Streamlet<Double> allMap = multiStreams.withStream("all").map((num) -> num * 10);
+ Streamlet<Double> positiveMap = positiveStream.map((num) -> num * 10);
+ Streamlet<Double> negativeMap = negativeStream.map((num) -> num * 10);
+
+ // Original streamlet should still have the default strean id eventhough the id
+ // is not available. Other shadow streamlets should have the correct stream ids.
+ assertEquals(multiStreams.getStreamId(), Utils.DEFAULT_STREAM_ID);
+ assertEquals(positiveStream.getStreamId(), "positive");
+ assertEquals(negativeStream.getStreamId(), "negative");
+
+ StreamletImpl<Double> impl = (StreamletImpl<Double>) multiStreams;
+ assertEquals(impl.getChildren().size(), 3);
+
+ // Children streamlets should have the right parent stream id
+ assertEquals(((MapStreamlet<Double, Double>) allMap).getParent().getStreamId(),
+ "all");
+ assertEquals(((MapStreamlet<Double, Double>) positiveMap).getParent().getStreamId(),
+ "positive");
+ assertEquals(((MapStreamlet<Double, Double>) negativeMap).getParent().getStreamId(),
+ "negative");
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testSplitAndWithWrongStream() {
+ Map<String, SerializablePredicate<Double>> splitter = new HashMap();
+ splitter.put("all", i -> true);
+ splitter.put("positive", i -> i > 0);
+ splitter.put("negative", i -> i < 0);
+
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+ // The streamlet should have three output streams after split()
+ Streamlet<Double> multiStreams = baseStreamlet.split(splitter);
+
+ // Select a good stream id and a bad stream id
+ Streamlet<Double> goodStream = multiStreams.withStream("positive");
+ Streamlet<Double> badStream = multiStreams.withStream("wrong-id");
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testWithWrongStream() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+ // Normal Streamlet objects, including sources, have only the default stream id.
+ // Selecting any other stream using withStream() should trigger a runtime
+ // exception
+ Streamlet<Double> badStream = baseStreamlet.withStream("wrong-id");
+ }
+
+ @Test
public void testSupplierStreamlet() {
Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
assertTrue(streamlet instanceof SupplierStreamlet);
@@ -492,6 +558,12 @@
}
@Test(expected = IllegalArgumentException.class)
+ public void testWithStreamWithInvalidValue() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+ baseStreamlet.withStream("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
@SuppressWarnings("unchecked")
public void testCloneStreamletWithInvalidNumberOfClone() {
Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
new file mode 100644
index 0000000..4cc70d2
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.ArrayList;
+
+import org.junit.Test;
+import org.mockito.Mock;
+
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit tests for {@link StreamletShadow}
+ */
+public class StreamletShadowTest {
+ @Mock
+ private StreamletImpl<Double> mockReal;
+
+ @Mock
+ private StreamletImpl<Double> mockChild;
+
+ @Test
+ public void testConstruction() {
+ doReturn("real_name").when(mockReal).getName();
+ doReturn(1).when(mockReal).getNumPartitions();
+ doNothing().when(mockReal).addChild(mockChild);
+ doReturn(new ArrayList<StreamletImpl<Double>>()).when(mockReal).getChildren();
+ doReturn("real_stream").when(mockReal).getStreamId();
+
+ StreamletShadow<Double> shadow = new StreamletShadow(mockReal);
+ assertEquals(shadow.getName(), "real_name");
+ assertEquals(shadow.getNumPartitions(), 1);
+ assertEquals(shadow.getStreamId(), "real_stream");
+
+ // set a different stream id
+ shadow.setStreamId("shadow_stream");
+ assertEquals(shadow.getStreamId(), "shadow_stream");
+
+ // addChild call should be forwarded to the real object
+ verify(mockReal, never()).addChild(mockChild);
+ shadow.addChild(mockChild);
+ verify(mockReal, times(1)).addChild(mockChild);
+ }
+
+}
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index b843732..5664d64 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -45,6 +45,7 @@
RemapStreamlet,
TransformStreamlet,
SinkStreamlet,
+ SplitStreamlet,
UnionStreamlet
}
@@ -245,6 +246,62 @@
verifySupplierStreamlet(supplierStreamlet2)
}
+ test("StreamletImpl should support split and withStream transformation") {
+ val supplierStreamlet = builder
+ .newSource(() => Math.random)
+ .setName("Supplier_Streamlet_1")
+ .setNumPartitions(20)
+
+ val splitted = supplierStreamlet
+ .split(Map(
+ "positive" -> { num: Double => num > 0 },
+ "negative" -> { num: Double => num < 0 }
+ ))
+ .setName("Split_Streamlet_1")
+ .setNumPartitions(5)
+
+ splitted.withStream("positive")
+ .map { num: Double =>
+ num * 10
+ }
+
+ splitted.withStream("negative")
+ .map { num: Double =>
+ num * -10
+ }
+
+ val supplierStreamletImpl =
+ supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+ assertEquals(1, supplierStreamletImpl.getChildren.size)
+ assertTrue(
+ supplierStreamletImpl
+ .getChildren(0)
+ .isInstanceOf[SplitStreamlet[_]])
+
+ val splitStreamlet = supplierStreamletImpl
+ .getChildren(0)
+ .asInstanceOf[SplitStreamlet[Double]]
+ assertEquals("Split_Streamlet_1", splitStreamlet.getName)
+ assertEquals(2, splitStreamlet.getChildren.size())
+ assertTrue(
+ splitStreamlet
+ .getChildren.get(0)
+ .isInstanceOf[MapStreamlet[_, _]])
+ assertTrue(
+ splitStreamlet
+ .getChildren.get(1)
+ .isInstanceOf[MapStreamlet[_, _]])
+
+ val mapStreamlet1 = splitStreamlet
+ .getChildren.get(0)
+ .asInstanceOf[MapStreamlet[Double, Double]]
+ assertEquals("positive", mapStreamlet1.getParent.getStreamId)
+ val mapStreamlet2 = splitStreamlet
+ .getChildren.get(1)
+ .asInstanceOf[MapStreamlet[Double, Double]]
+ assertEquals("negative", mapStreamlet2.getParent.getStreamId)
+ }
+
test("StreamletImpl should support consume function") {
val supplierStreamlet = builder
.newSource(() => Math.random)
diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStream.java b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStream.java
new file mode 100644
index 0000000..0e787ff
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.streamlet_with_split_and_with_stream;
+
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.integration_test.common.AbstractTestTopology;
+import org.apache.heron.integration_test.core.TestTopologyBuilder;
+import org.apache.heron.streamlet.Builder;
+import org.apache.heron.streamlet.SerializablePredicate;
+import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.impl.BuilderImpl;
+
+/**
+ * Streamlet Integration Test
+ */
+class StreamletWithSplitAndWithStream extends AbstractTestTopology {
+ private static AtomicInteger atomicInteger = new AtomicInteger(-3);
+
+ StreamletWithSplitAndWithStream(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ @Override
+ protected TestTopologyBuilder buildTopology(TestTopologyBuilder testTopologyBuilder) {
+ Map<String, SerializablePredicate<Integer>> splitter = new HashMap();
+ splitter.put("all", i -> true);
+ splitter.put("positive", i -> i > 0);
+ splitter.put("negative", i -> i < 0);
+
+ Builder streamletBuilder = Builder.newBuilder();
+ Streamlet<Integer> multi = streamletBuilder
+ .newSource(() -> atomicInteger.getAndIncrement())
+ .setName("incremented-numbers-from--3")
+ .filter(i -> i <= 4)
+ .setName("numbers-lower-than-4")
+ .split(splitter)
+ .setName("split");
+
+ multi.withStream("all").map((Integer i) -> String.format("all_%d", i));
+ multi.withStream("positive").map((Integer i) -> String.format("pos_%d", i));
+ multi.withStream("negative").map((Integer i) -> String.format("neg_%d", i));
+
+ BuilderImpl streamletBuilderImpl = (BuilderImpl) streamletBuilder;
+ TestTopologyBuilder topology =
+ (TestTopologyBuilder) streamletBuilderImpl.build(testTopologyBuilder);
+
+ return topology;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ StreamletWithSplitAndWithStream topology = new StreamletWithSplitAndWithStream(args);
+ topology.submit(conf);
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json
new file mode 100644
index 0000000..a401e62
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_test/topology/streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json
@@ -0,0 +1 @@
+["all_-1", "all_-2", "all_-3", "all_0", "all_1", "all_2", "all_3", "all_4", "neg_-1", "neg_-2", "neg_-3", "pos_1", "pos_2", "pos_3", "pos_4"]
\ No newline at end of file
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index 762b2f6..62cc92f 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -17,6 +17,11 @@
"topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
"classPath" : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone.ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
"expectedResultRelativePath" : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json"
+ },
+ {
+ "topologyName": "IntegrationTest_ScalaStreamletWithSplitAndWithStream",
+ "classPath": "scala_streamlet_with_split_and_with_stream.ScalaStreamletWithSplitAndWithStream",
+ "expectedResultRelativePath": "scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json"
}
],
"javaTopologies": [
@@ -138,6 +143,11 @@
"topologyName": "IntegrationTest_StreamletWithMapAndFlatMapAndFilterAndClone",
"classPath": "streamlet_with_map_and_flatmap_and_filter_and_clone.StreamletWithMapAndFlatMapAndFilterAndClone",
"expectedResultRelativePath": "streamlet_with_map_and_flatmap_and_filter_and_clone/StreamletWithMapAndFlatMapAndFilterAndCloneResults.json"
+ },
+ {
+ "topologyName": "IntegrationTest_StreamletWithSplitAndWithStream",
+ "classPath": "streamlet_with_split_and_with_stream.StreamletWithSplitAndWithStream",
+ "expectedResultRelativePath": "streamlet_with_split_and_with_stream/StreamletWithSplitAndWithStreamResults.json"
}
],
"pythonTopologies": [
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStream.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStream.scala
new file mode 100644
index 0000000..307a037
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStream.scala
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_test.topology.scala_streamlet_with_split_and_with_stream
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.Context
+
+import org.apache.heron.integration_test.common.{
+ AbstractTestTopology,
+ ScalaIntegrationTestBase
+}
+import org.apache.heron.streamlet.scala.{
+ Builder, SerializableTransformer
+}
+
+object ScalaStreamletWithSplitAndWithStream {
+ def main(args: Array[String]): Unit = {
+ val conf = new Config
+ val topology = new ScalaStreamletWithSplitAndWithStream(args)
+ topology.submit(conf)
+ }
+}
+
+/**
+ * Scala Streamlet Integration Test
+ */
+class ScalaStreamletWithSplitAndWithStream(args: Array[String])
+ extends AbstractTestTopology(args)
+ with ScalaIntegrationTestBase {
+
+ override protected def buildTopology(
+ testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+ val atomicInteger = new AtomicInteger(-3)
+
+ val streamletBuilder = Builder.newBuilder
+
+ val multi = streamletBuilder
+ .newSource(() => atomicInteger.getAndIncrement())
+ .setName("incremented-numbers-from--3")
+ .filter(i => i <= 4)
+ .setName("numbers-lower-than-4")
+ .split(Map(
+ "all" -> { num => true },
+ "positive" -> { num => num > 0 },
+ "negative" -> { num => num < 0 }
+ ))
+ .setName("split")
+
+ multi.withStream("all").map { i => s"all_$i" }
+ multi.withStream("positive").map { i => s"pos_$i" }
+ multi.withStream("negative").map{ i => s"neg_$i" }
+
+ build(testTopologyBuilder, streamletBuilder)
+ }
+}
diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json
new file mode 100644
index 0000000..4df41be
--- /dev/null
+++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_split_and_with_stream/ScalaStreamletWithSplitAndWithStreamResults.json
@@ -0,0 +1,17 @@
+[
+ "all_-1",
+ "all_-2",
+ "all_-3",
+ "all_0",
+ "all_1",
+ "all_2",
+ "all_3",
+ "all_4",
+ "neg_-1",
+ "neg_-2",
+ "neg_-3",
+ "pos_1",
+ "pos_2",
+ "pos_3",
+ "pos_4"
+]
\ No newline at end of file