Make log/sink/consume Streamlet component support setName and setNumPartitions (#3459)
diff --git a/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java b/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java
index 843da57..03488f8 100644
--- a/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java
+++ b/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java
@@ -144,13 +144,15 @@
* Elements in the first cloned streamlet go to the database sink.
*/
splitGameScoreStreamlet.get(0)
- .toSink(new DatabaseSink());
+ .toSink(new DatabaseSink())
+ .setName("sink0");
/**
* Elements in the second cloned streamlet go to the logging sink.
*/
splitGameScoreStreamlet.get(1)
- .toSink(new FormattedLogSink());
+ .toSink(new FormattedLogSink())
+ .setName("sink1");
Config config = Config.defaultConfig();
diff --git a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala
index f155bcc..7bc20ca 100644
--- a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala
+++ b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala
@@ -92,6 +92,7 @@
)
.setName("joined-classical-musics-by-year")
.log()
+ .setName("log")
val config = Config.defaultConfig()
diff --git a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala
index b0663bc..6f618df 100644
--- a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala
+++ b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala
@@ -58,6 +58,8 @@
.union(zeroes)
.setName("union-of-numbers")
.log()
+ .setName("log")
+ .setNumPartitions(1)
val config = Config.newBuilder
.setNumContainers(NUM_CONTAINERS)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 1a108a4..1c38760 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -45,7 +45,7 @@
* Streamlet before doing the transformation.
*/
@InterfaceStability.Evolving
-public interface Streamlet<R> {
+public interface Streamlet<R> extends StreamletBase<R> {
/**
* Sets the name of the BaseStreamlet.
@@ -299,7 +299,7 @@
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
- void log();
+ StreamletBase<R> log();
/**
* Applies the consumer function to every element of the stream
@@ -307,7 +307,7 @@
* @param consumer The user supplied consumer function that is invoked for each element
* of this streamlet.
*/
- void consume(SerializableConsumer<R> consumer);
+ StreamletBase<R> consume(SerializableConsumer<R> consumer);
/**
* Applies the sink's put function to every element of the stream
@@ -315,5 +315,5 @@
* @param sink The Sink whose put method consumes each element
* of this streamlet.
*/
- void toSink(Sink<R> sink);
+ StreamletBase<R> toSink(Sink<R> sink);
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java b/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java
new file mode 100644
index 0000000..f52e7f2
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+/**
+ * A Streamlet is a (potentially unbounded) ordered collection of tuples.
+ * The StreamletBase class contains basic information of a Streamlet
+ * such as name and partition count without the connection functions
+ * such as map() and filter().
+ */
+public interface StreamletBase<R> {
+
+ /**
+ * Sets the name of the BaseStreamlet.
+ * @param sName The name given by the user for this BaseStreamlet
+ * @return Returns back the Streamlet with changed name
+ */
+ StreamletBase<R> setName(String sName);
+
+ /**
+ * Gets the name of the Streamlet.
+ * @return Returns the name of the Streamlet
+ */
+ String getName();
+
+ /**
+ * Sets the number of partitions of the streamlet
+ * @param numPartitions The user assigned number of partitions
+ * @return Returns back the Streamlet with changed number of partitions
+ */
+ StreamletBase<R> setNumPartitions(int numPartitions);
+
+ /**
+ * Gets the number of partitions of this Streamlet.
+ * @return the number of partitions of this Streamlet
+ */
+ int getNumPartitions();
+
+ // This is the main interface that every Streamlet implementation should implement
+ // The main tasks are generally to make sure that appropriate names/partitions are
+ // computed and add a spout/bolt to the TopologyBuilder
+ // void build(TopologyBuilder bldr, Set<String> stageNames);
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 853bb97..2dca00e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -91,7 +91,7 @@
streamlet.build(builder, stageNames);
}
for (StreamletImpl<?> streamlet : sources) {
- if (!streamlet.allBuilt()) {
+ if (!streamlet.isFullyBuilt()) {
throw new RuntimeException("Topology cannot be fully built! Are all sources added?");
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java
new file mode 100644
index 0000000..6b3530c
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.StreamletBase;
+
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
+
+/**
+ * A Streamlet is a (potentially unbounded) ordered collection of tuples.
+ * Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from
+ * static data(such as csv files, HDFS files), or for that matter any other
+ * source. They are also created by transforming existing Streamlets using
+ * operations such as map/flatMap, etc.
+ * Besides the tuples, a Streamlet has the following properties associated with it
+ * a) name. User assigned or system generated name to refer the streamlet
+ * b) nPartitions. Number of partitions that the streamlet is composed of. Thus the
+ * ordering of the tuples in a Streamlet is wrt the tuples within a partition.
+ * This allows the system to distribute each partition to different nodes across the cluster.
+ * A bunch of transformations can be done on Streamlets(like map/flatMap, etc.). Each
+ * of these transformations operate on every tuple of the Streamlet and produce a new
+ * Streamlet. One can think of a transformation attaching itself to the stream and processing
+ * each tuple as they go by. Thus the parallelism of any operator is implicitly determined
+ * by the number of partitions of the stream that it is operating on. If a particular
+ * transformation wants to operate at a different parallelism, one can repartition the
+ * Streamlet before doing the transformation.
+ */
+public abstract class StreamletBaseImpl<R> implements StreamletBase<R> {
+ private static final Logger LOG = Logger.getLogger(StreamletBaseImpl.class.getName());
+ protected String name;
+ protected int nPartitions;
+ private List<StreamletBaseImpl<?>> children;
+ private boolean built;
+
+ /**
+ * Only used by the implementors
+ */
+ protected StreamletBaseImpl() {
+ this.name = null;
+ this.nPartitions = -1;
+ this.children = new LinkedList<>();
+ this.built = false;
+ }
+
+ protected enum StreamletNamePrefix {
+ CONSUMER("consumer"),
+ COUNT("count"),
+ CUSTOM("custom"),
+ CUSTOM_BASIC("customBasic"),
+ CUSTOM_WINDOW("customWindow"),
+ FILTER("filter"),
+ FLATMAP("flatmap"),
+ JOIN("join"),
+ KEYBY("keyBy"),
+ LOGGER("logger"),
+ MAP("map"),
+ SOURCE("generator"),
+ REDUCE("reduce"),
+ REMAP("remap"),
+ SINK("sink"),
+ SPLIT("split"),
+ SPOUT("spout"),
+ SUPPLIER("supplier"),
+ TRANSFORM("transform"),
+ UNION("union");
+
+ private final String prefix;
+
+ StreamletNamePrefix(final String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String toString() {
+ return prefix;
+ }
+ }
+
+ /**
+ * Sets the name of the Streamlet.
+ * @param sName The name given by the user for this streamlet
+ * @return Returns back the Streamlet with changed name
+ */
+ @Override
+ public StreamletBase<R> setName(String sName) {
+ checkNotBlank(sName, "Streamlet name cannot be null/blank");
+
+ this.name = sName;
+ return this;
+ }
+
+ /**
+ * Gets the name of the Streamlet.
+ * @return Returns the name of the Streamlet
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
+ int index = 1;
+ String calculatedName;
+ while (true) {
+ calculatedName = new StringBuilder(prefix.toString()).append(index).toString();
+ if (!stageNames.contains(calculatedName)) {
+ break;
+ }
+ index++;
+ }
+ LOG.info("Calculated stage Name as " + calculatedName);
+ return calculatedName;
+ }
+
+ /**
+ * Sets a default unique name to the Streamlet by type if it is not set.
+ * Otherwise, just checks its uniqueness.
+ * @param prefix The name prefix of this streamlet
+ * @param stageNames The collections of created streamlet/stage names
+ */
+ protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
+ if (getName() == null) {
+ setName(defaultNameCalculator(prefix, stageNames));
+ }
+ if (stageNames.contains(getName())) {
+ throw new RuntimeException(String.format(
+ "The stage name %s is used multiple times in the same topology", getName()));
+ }
+ stageNames.add(getName());
+ }
+
+ /**
+ * Sets the number of partitions of the streamlet
+ * @param numPartitions The user assigned number of partitions
+ * @return Returns back the Streamlet with changed number of partitions
+ */
+ @Override
+ public StreamletBase<R> setNumPartitions(int numPartitions) {
+ require(numPartitions > 0, "Streamlet's partitions number should be > 0");
+
+ this.nPartitions = numPartitions;
+ return this;
+ }
+
+ /**
+ * Gets the number of partitions of this Streamlet.
+ * @return the number of partitions of this Streamlet
+ */
+ @Override
+ public int getNumPartitions() {
+ return nPartitions;
+ }
+
+ public <T> void addChild(StreamletBaseImpl<T> child) {
+ children.add(child);
+ }
+
+ /**
+ * Gets all the children of this streamlet.
+ * Children of a streamlet are streamlets that are resulting from transformations of elements of
+ * this and potentially other streamlets.
+ * @return The kid streamlets
+ */
+ public List<StreamletBaseImpl<?>> getChildren() {
+ return children;
+ }
+
+ public void build(TopologyBuilder bldr, Set<String> stageNames) {
+ if (built) {
+ throw new RuntimeException("Logic Error While building " + getName());
+ }
+
+ if (doBuild(bldr, stageNames)) {
+ built = true;
+ for (StreamletBaseImpl<?> streamlet : getChildren()) {
+ streamlet.build(bldr, stageNames);
+ }
+ }
+ }
+
+ public boolean isBuilt() {
+ return built;
+ }
+
+ public boolean isFullyBuilt() {
+ if (!isBuilt()) {
+ return false;
+ }
+ for (StreamletBaseImpl<?> child : children) {
+ if (!child.isFullyBuilt()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // This is the main interface that every Streamlet implementation should implement
+ // The main tasks are generally to make sure that appropriate names/partitions are
+ // computed and add a spout/bolt to the TopologyBuilder
+ protected abstract boolean doBuild(TopologyBuilder bldr, Set<String> stageNames);
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 81a7d78..65d7534 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,7 +28,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.heron.api.grouping.NoneStreamGrouping;
import org.apache.heron.api.grouping.StreamGrouping;
-import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
@@ -43,6 +41,7 @@
import org.apache.heron.streamlet.SerializableTransformer;
import org.apache.heron.streamlet.Sink;
import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.StreamletBase;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet;
@@ -89,72 +88,9 @@
* transformation wants to operate at a different parallelism, one can repartition the
* Streamlet before doing the transformation.
*/
-public abstract class StreamletImpl<R> implements Streamlet<R> {
+public abstract class StreamletImpl<R>
+ extends StreamletBaseImpl<R> implements Streamlet<R> {
private static final Logger LOG = Logger.getLogger(StreamletImpl.class.getName());
- protected String name;
- protected int nPartitions;
- private List<StreamletImpl<?>> children;
- private boolean built;
-
- public boolean isBuilt() {
- return built;
- }
-
- public boolean allBuilt() {
- if (!built) {
- return false;
- }
- for (StreamletImpl<?> child : children) {
- if (!child.allBuilt()) {
- return false;
- }
- }
- return true;
- }
-
- protected enum StreamletNamePrefix {
- CONSUMER("consumer"),
- COUNT("count"),
- CUSTOM("custom"),
- CUSTOM_BASIC("customBasic"),
- CUSTOM_WINDOW("customWindow"),
- FILTER("filter"),
- FLATMAP("flatmap"),
- JOIN("join"),
- KEYBY("keyBy"),
- LOGGER("logger"),
- MAP("map"),
- SOURCE("generator"),
- REDUCE("reduce"),
- REMAP("remap"),
- SINK("sink"),
- SPLIT("split"),
- SPOUT("spout"),
- SUPPLIER("supplier"),
- TRANSFORM("transform"),
- UNION("union");
-
- private final String prefix;
-
- StreamletNamePrefix(final String prefix) {
- this.prefix = prefix;
- }
-
- @Override
- public String toString() {
- return prefix;
- }
- }
-
- /**
- * Gets all the children of this streamlet.
- * Children of a streamlet are streamlets that are resulting from transformations of elements of
- * this and potentially other streamlets.
- * @return The kid streamlets
- */
- public List<StreamletImpl<?>> getChildren() {
- return children;
- }
/**
* Sets the name of the Streamlet.
@@ -163,61 +99,22 @@
*/
@Override
public Streamlet<R> setName(String sName) {
- checkNotBlank(sName, "Streamlet name cannot be null/blank");
-
- this.name = sName;
+ super.setName(sName);
return this;
}
/**
- * Gets the name of the Streamlet.
- * @return Returns the name of the Streamlet
- */
- @Override
- public String getName() {
- return name;
- }
-
- /**
- * Sets a default unique name to the Streamlet by type if it is not set.
- * Otherwise, just checks its uniqueness.
- * @param prefix The name prefix of this streamlet
- * @param stageNames The collections of created streamlet/stage names
- */
- protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
- if (getName() == null) {
- setName(defaultNameCalculator(prefix, stageNames));
- }
- if (stageNames.contains(getName())) {
- throw new RuntimeException(String.format(
- "The stage name %s is used multiple times in the same topology", getName()));
- }
- stageNames.add(getName());
- }
-
- /**
* Sets the number of partitions of the streamlet
* @param numPartitions The user assigned number of partitions
* @return Returns back the Streamlet with changed number of partitions
*/
@Override
public Streamlet<R> setNumPartitions(int numPartitions) {
- require(numPartitions > 0, "Streamlet's partitions number should be > 0");
-
- this.nPartitions = numPartitions;
+ super.setNumPartitions(numPartitions);
return this;
}
/**
- * Gets the number of partitions of this Streamlet.
- * @return the number of partitions of this Streamlet
- */
- @Override
- public int getNumPartitions() {
- return nPartitions;
- }
-
- /**
* Set the id of the stream to be used by the children nodes.
* Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
* source.withStream("stream1").filter(...).log();
@@ -271,45 +168,7 @@
* Only used by the implementors
*/
protected StreamletImpl() {
- this.nPartitions = -1;
- this.children = new LinkedList<>();
- this.built = false;
- }
-
- public void build(TopologyBuilder bldr, Set<String> stageNames) {
- if (built) {
- throw new RuntimeException("Logic Error While building " + getName());
- }
-
- if (doBuild(bldr, stageNames)) {
- built = true;
- for (StreamletImpl<?> streamlet : children) {
- streamlet.build(bldr, stageNames);
- }
- }
- }
-
- // This is the main interface that every Streamlet implementation should implement
- // The main tasks are generally to make sure that appropriate names/partitions are
- // computed and add a spout/bolt to the TopologyBuilder
- protected abstract boolean doBuild(TopologyBuilder bldr, Set<String> stageNames);
-
- public <T> void addChild(StreamletImpl<T> child) {
- children.add(child);
- }
-
- private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
- int index = 1;
- String calculatedName;
- while (true) {
- calculatedName = new StringBuilder(prefix.toString()).append(index).toString();
- if (!stageNames.contains(calculatedName)) {
- break;
- }
- index++;
- }
- LOG.info("Calculated stage Name as " + calculatedName);
- return calculatedName;
+ super();
}
/**
@@ -571,9 +430,10 @@
* that does not contain any tuple. Thus this function returns void.
*/
@Override
- public void log() {
+ public StreamletBase<R> log() {
LogStreamlet<R> logger = new LogStreamlet<>(this);
addChild(logger);
+ return logger;
}
/**
@@ -581,11 +441,12 @@
* @param consumer The user supplied consumer function that is invoked for each element
*/
@Override
- public void consume(SerializableConsumer<R> consumer) {
+ public StreamletBase<R> consume(SerializableConsumer<R> consumer) {
checkNotNull(consumer, "consumer cannot be null");
ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<>(this, consumer);
addChild(consumerStreamlet);
+ return consumerStreamlet;
}
/**
@@ -593,11 +454,12 @@
* @param sink The Sink that consumes
*/
@Override
- public void toSink(Sink<R> sink) {
+ public StreamletBase<R> toSink(Sink<R> sink) {
checkNotNull(sink, "sink cannot be null");
SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<>(this, sink);
addChild(sinkStreamlet);
+ return sinkStreamlet;
}
/**
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
index 633c4c6..eb5d3be 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
@@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
/**
@@ -100,8 +101,7 @@
/*
* Functions related to topology building need to be overriden.
*/
- @Override
- public <T> void addChild(StreamletImpl<T> child) {
+ public <T> void addChild(StreamletBaseImpl<T> child) {
real.addChild(child);
}
@@ -112,7 +112,7 @@
* @return The kid streamlets
*/
@Override
- public List<StreamletImpl<?>> getChildren() {
+ public List<StreamletBaseImpl<?>> getChildren() {
return real.getChildren();
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
index f71b71f..92c5928 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
@@ -24,6 +24,7 @@
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.SerializableConsumer;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.sinks.ConsumerSink;
@@ -32,7 +33,7 @@
* streamlet after consuming every element. Since elements of the parents are just consumed
* by the user passed consumer function, nothing is emitted, thus this streamlet is empty.
*/
-public class ConsumerStreamlet<R> extends StreamletImpl<R> {
+public class ConsumerStreamlet<R> extends StreamletBaseImpl<R> {
private StreamletImpl<R> parent;
private SerializableConsumer<R> consumer;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
index 9f68ff9..db9cefa 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
@@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.sinks.LogSink;
@@ -30,7 +31,7 @@
* streamlet after logging each element. Since elements of the parents are just logged
* nothing is emitted, thus this streamlet is empty.
*/
-public class LogStreamlet<R> extends StreamletImpl<R> {
+public class LogStreamlet<R> extends StreamletBaseImpl<R> {
private StreamletImpl<R> parent;
public LogStreamlet(StreamletImpl<R> parent) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
index 42e5d03..9625b6d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java
@@ -24,6 +24,7 @@
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.Sink;
+import org.apache.heron.streamlet.impl.StreamletBaseImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.sinks.ComplexSink;
@@ -32,7 +33,7 @@
* streamlet after consuming every element. Since elements of the parents are just consumed
* by the user passed consumer function, nothing is emitted, thus this streamlet is empty.
*/
-public class SinkStreamlet<R> extends StreamletImpl<R> {
+public class SinkStreamlet<R> extends StreamletBaseImpl<R> {
private StreamletImpl<R> parent;
private Sink<R> sink;
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 8151941..073b74a 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -46,7 +46,7 @@
* transformation wants to operate at a different parallelism, one can repartition the
* Streamlet before doing the transformation.
*/
-trait Streamlet[R] {
+trait Streamlet[R] extends StreamletBase[R] {
/**
* Sets the name of the Streamlet.
@@ -57,13 +57,6 @@
def setName(sName: String): Streamlet[R]
/**
- * Gets the name of the Streamlet.
- *
- * @return Returns the name of the Streamlet
- */
- def getName: String
-
- /**
* Sets the number of partitions of the streamlet
*
* @param numPartitions The user assigned number of partitions
@@ -72,13 +65,6 @@
def setNumPartitions(numPartitions: Int): Streamlet[R]
/**
- * Gets the number of partitions of this Streamlet.
- *
- * @return the number of partitions of this Streamlet
- */
- def getNumPartitions: Int
-
- /**
* Set the id of the stream to be used by the children nodes.
* Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
* source.withStream("stream1").filter(...).log();
@@ -320,7 +306,7 @@
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
- def log(): Unit
+ def log(): StreamletBase[R]
/**
* Applies the consumer function to every element of the stream
@@ -329,7 +315,7 @@
* @param consumer The user supplied consumer function that is invoked for each element
* of this streamlet.
*/
- def consume(consumer: R => Unit): Unit
+ def consume(consumer: R => Unit): StreamletBase[R]
/**
* Applies the sink's put function to every element of the stream
@@ -338,6 +324,6 @@
* @param sink The Sink whose put method consumes each element
* of this streamlet.
*/
- def toSink(sink: Sink[R]): Unit
+ def toSink(sink: Sink[R]): StreamletBase[R]
}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala
new file mode 100644
index 0000000..64468d1
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala
+
+/**
+ * A Streamlet is a (potentially unbounded) ordered collection of tuples.
+ * The StreamletBase class contains basic information of a Streamlet
+ * such as name and partition count without the connection functions
+ * such as map() and filter().
+ */
+trait StreamletBase[R] {
+
+ /**
+ * Sets the name of the Streamlet.
+ *
+ * @param sName The name given by the user for this Streamlet
+ * @return Returns back the Streamlet with changed name
+ */
+ def setName(sName: String): StreamletBase[R]
+
+ /**
+ * Gets the name of the Streamlet.
+ *
+ * @return Returns the name of the Streamlet
+ */
+ def getName: String
+
+ /**
+ * Sets the number of partitions of the streamlet
+ *
+ * @param numPartitions The user assigned number of partitions
+ * @return Returns back the Streamlet with changed number of partitions
+ */
+ def setNumPartitions(numPartitions: Int): StreamletBase[R]
+
+ /**
+ * Gets the number of partitions of this Streamlet.
+ *
+ * @return the number of partitions of this Streamlet
+ */
+ def getNumPartitions: Int
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala
new file mode 100644
index 0000000..d5f336e
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala.impl
+
+import org.apache.heron.streamlet.{StreamletBase => JavaStreamletBase}
+import org.apache.heron.streamlet.scala.StreamletBase
+
+object StreamletBaseImpl {
+ def fromJavaStreamletBase[R](javaStreamletBase: JavaStreamletBase[R]): StreamletBase[R] =
+ new StreamletBaseImpl[R](javaStreamletBase)
+
+ def toJavaStreamletBase[R](streamlet: StreamletBase[R]): JavaStreamletBase[R] =
+ streamlet.asInstanceOf[StreamletBaseImpl[R]].javaStreamletBase
+}
+
+/**
+ * This class provides Scala Streamlet Implementation by wrapping Java Streamlet API.
+ * Passed User defined Scala Functions are transformed to related FunctionalInterface versions and
+ * related Java Streamlet is transformed to Scala version again.
+ */
+class StreamletBaseImpl[R](val javaStreamletBase: JavaStreamletBase[R]) extends StreamletBase[R] {
+ import StreamletBaseImpl._
+
+ /**
+ * Sets the name of the Streamlet.
+ *
+ * @param sName The name given by the user for this Streamlet
+ * @return Returns back the Streamlet with changed name
+ */
+ override def setName(sName: String): StreamletBase[R] =
+ fromJavaStreamletBase[R](javaStreamletBase.setName(sName))
+
+ /**
+ * Gets the name of the Streamlet.
+ *
+ * @return Returns the name of the Streamlet
+ */
+ override def getName(): String = javaStreamletBase.getName
+
+ /**
+ * Sets the number of partitions of the streamlet
+ *
+ * @param numPartitions The user assigned number of partitions
+ * @return Returns back the Streamlet with changed number of partitions
+ */
+ override def setNumPartitions(numPartitions: Int): StreamletBase[R] =
+ fromJavaStreamletBase[R](javaStreamletBase.setNumPartitions(numPartitions))
+
+ /**
+ * Gets the number of partitions of this Streamlet.
+ *
+ * @return the number of partitions of this Streamlet
+ */
+ override def getNumPartitions(): Int = javaStreamletBase.getNumPartitions
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index fe6b7df..bee1298 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -32,9 +32,11 @@
SerializablePredicate,
KVStreamlet => JavaKVStreamlet,
Streamlet => JavaStreamlet,
+ StreamletBase => JavaStreamletBase,
WindowConfig
}
import org.apache.heron.streamlet.impl.{
+ StreamletBaseImpl => JavaStreamletBaseImpl,
StreamletImpl => JavaStreamletImpl
}
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet
@@ -43,7 +45,8 @@
SerializableTransformer,
Sink,
KVStreamlet,
- Streamlet
+ Streamlet,
+ StreamletBase
}
import org.apache.heron.streamlet.scala.converter.ScalaToJavaConverter._
@@ -62,7 +65,7 @@
* related Java Streamlet is transformed to Scala version again.
*/
class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
- extends Streamlet[R] {
+ extends StreamletBaseImpl[R](javaStreamlet) with Streamlet[R] {
import StreamletImpl._
@@ -76,13 +79,6 @@
fromJavaStreamlet[R](javaStreamlet.setName(sName))
/**
- * Gets the name of the Streamlet.
- *
- * @return Returns the name of the Streamlet
- */
- override def getName(): String = javaStreamlet.getName
-
- /**
* Sets the number of partitions of the streamlet
*
* @param numPartitions The user assigned number of partitions
@@ -92,13 +88,6 @@
fromJavaStreamlet[R](javaStreamlet.setNumPartitions(numPartitions))
/**
- * Gets the number of partitions of this Streamlet.
- *
- * @return the number of partitions of this Streamlet
- */
- override def getNumPartitions(): Int = javaStreamlet.getNumPartitions
-
- /**
* Set the id of the stream to be used by the children nodes.
* Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
* source.withStream("stream1").filter(...).log();
@@ -480,7 +469,10 @@
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
- override def log(): Unit = javaStreamlet.log()
+ override def log(): StreamletBase[R] = {
+ val newJavaStreamletBase = javaStreamlet.log()
+ StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase)
+ }
/**
* Applies the consumer function to every element of the stream
@@ -489,9 +481,10 @@
* @param consumer The user supplied consumer function that is invoked for each element
* of this streamlet.
*/
- override def consume(consumer: R => Unit): Unit = {
+ override def consume(consumer: R => Unit): StreamletBase[R] = {
val serializableConsumer = toSerializableConsumer[R](consumer)
- javaStreamlet.consume(serializableConsumer)
+ val newJavaStreamletBase = javaStreamlet.consume(serializableConsumer)
+ StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase)
}
/**
@@ -501,9 +494,10 @@
* @param sink The Sink whose put method consumes each element
* of this streamlet.
*/
- override def toSink(sink: Sink[R]): Unit = {
+ override def toSink(sink: Sink[R]): StreamletBase[R] = {
val javaSink = toJavaSink[R](sink)
- javaStreamlet.toSink(javaSink)
+ val newJavaStreamletBase = javaStreamlet.toSink(javaSink)
+ StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase)
}
/**
@@ -513,11 +507,11 @@
*
* @return The kid streamlets
*/
- private[impl] def getChildren: List[JavaStreamletImpl[_]] = {
+ private[impl] def getChildren: List[JavaStreamletBaseImpl[_]] = {
import _root_.scala.collection.JavaConversions._
val children =
javaStreamlet
- .asInstanceOf[JavaStreamletImpl[_]]
+ .asInstanceOf[JavaStreamletBaseImpl[_]]
.getChildren
children.toList
}
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index ea40f01..4392417 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -461,7 +461,7 @@
TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
supplierStreamlet.build(topologyBuilder, stageNames);
- assertTrue(supplierStreamlet.allBuilt());
+ assertTrue(supplierStreamlet.isFullyBuilt());
assertEquals(supplierStreamlet.getChildren().size(), 1);
assertTrue(supplierStreamlet.getChildren().get(0) instanceof FlatMapStreamlet);
FlatMapStreamlet<String, String> fStreamlet =
@@ -499,11 +499,11 @@
Set<String> stageNames = new HashSet<>();
supplierStreamlet1.build(topologyBuilder, stageNames);
assertTrue(supplierStreamlet1.isBuilt());
- assertFalse(supplierStreamlet1.allBuilt());
+ assertFalse(supplierStreamlet1.isFullyBuilt());
supplierStreamlet2.build(topologyBuilder, stageNames);
- assertTrue(supplierStreamlet1.allBuilt());
- assertTrue(supplierStreamlet2.allBuilt());
+ assertTrue(supplierStreamlet1.isFullyBuilt());
+ assertTrue(supplierStreamlet2.isFullyBuilt());
// go over all stuff
assertEquals(supplierStreamlet1.getChildren().size(), 1);
@@ -546,7 +546,7 @@
supplierStreamlet.build(topologyBuilder, stageNames);
// verify SupplierStreamlet
- assertTrue(supplierStreamlet.allBuilt());
+ assertTrue(supplierStreamlet.isFullyBuilt());
assertEquals(1, supplierStreamlet.getChildren().size());
assertTrue(supplierStreamlet.getChildren().get(0) instanceof ConsumerStreamlet);
assertEquals("consumer1", supplierStreamlet.getChildren().get(0).getName());