Nwang/add spout based source streamlet (#3032)
* Add spout based Streamlet source
diff --git a/examples/src/scala/BUILD b/examples/src/scala/BUILD
index d03d9a8..786ac2a 100644
--- a/examples/src/scala/BUILD
+++ b/examples/src/scala/BUILD
@@ -8,6 +8,7 @@
deps = [
"@org_apache_commons_commons_lang3//jar",
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/api/src/scala:api-scala",
"//third_party/java:kryo"
],
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Builder.java b/heron/api/src/java/org/apache/heron/streamlet/Builder.java
index 8e7c6e7..d6e4f70 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Builder.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Builder.java
@@ -20,6 +20,7 @@
package org.apache.heron.streamlet;
+import org.apache.heron.api.spout.IRichSpout;
import org.apache.heron.streamlet.impl.BuilderImpl;
/**
@@ -45,4 +46,12 @@
* @return
*/
<R> Streamlet<R> newSource(Source<R> generator);
+
+ /**
+ * Creates a new Streamlet using the provided spout
+ * @param spout The spout that emits the tuples of the streamlet
+ * @param <R>
+ * @return
+ */
+ <R> Streamlet<R> newSource(IRichSpout spout);
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 0eb06d9..61715e8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Set;
+import org.apache.heron.api.spout.IRichSpout;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.Builder;
import org.apache.heron.streamlet.SerializableSupplier;
@@ -47,7 +48,6 @@
public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
StreamletUtils.require(supplier != null, "supplier must not be null.");
StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
- retval.setNumPartitions(1);
sources.add(retval);
return retval;
}
@@ -56,7 +56,14 @@
public <R> Streamlet<R> newSource(Source<R> generator) {
StreamletUtils.require(generator != null, "source must not be null.");
StreamletImpl<R> retval = StreamletImpl.createGeneratorStreamlet(generator);
- retval.setNumPartitions(1);
+ sources.add(retval);
+ return retval;
+ }
+
+ @Override
+ public <R> Streamlet<R> newSource(IRichSpout spout) {
+ StreamletUtils.require(spout != null, "spout must not be null.");
+ StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
sources.add(retval);
return retval;
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 066dc74..a05040b 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.logging.Logger;
+import org.apache.heron.api.spout.IRichSpout;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
@@ -54,6 +55,7 @@
import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -115,6 +117,7 @@
REMAP("remap"),
SINK("sink"),
SOURCE("generator"),
+ SPOUT("spout"),
SUPPLIER("supplier"),
TRANSFORM("transform"),
UNION("union");
@@ -264,6 +267,14 @@
}
/**
+ * Create a Streamlet based on a Spout object
+ * @param spout The Spout function to generate the elements
+ */
+ static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
+ return new SpoutStreamlet<T>(spout);
+ }
+
+ /**
* Return a new Streamlet by applying mapFn to each element of this Streamlet
* @param mapFn The Map Function that should be applied to each element
*/
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java
new file mode 100644
index 0000000..5bb12cf
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * SpoutStreamlet is a quick way of creating a Streamlet
+ * from an user supplied Spout object. The spout is the
+ * source of all tuples for this Streamlet.
+ */
+public class SpoutStreamlet<R> extends StreamletImpl<R> {
+ private IRichSpout spout;
+
+ public SpoutStreamlet(IRichSpout spout) {
+ this.spout = spout;
+ setNumPartitions(1);
+ }
+
+ @Override
+ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ setDefaultNameIfNone(StreamletNamePrefix.SPOUT, stageNames);
+ bldr.setSpout(getName(), spout, getNumPartitions());
+ return true;
+ }
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
index 0e5f13e..b299f52 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
@@ -18,6 +18,7 @@
*/
package org.apache.heron.streamlet.scala
+import org.apache.heron.api.spout.IRichSpout
import org.apache.heron.streamlet.scala.impl.BuilderImpl
/**
@@ -49,7 +50,14 @@
* Creates a new Streamlet using the underlying generator
*
* @param generator The generator that generates the tuples of the streamlet
- * @return a Streamlet representation of the source object
+ * @return a Streamlet representation of the source object
*/
def newSource[R](generator: Source[R]): Streamlet[R]
+
+ /**
+ * Creates a new Streamlet using the provided spout
+ * @param spout The spout that emits the tuples of the streamlet
+ * @return a Streamlet representation of the spout object
+ */
+ def newSource[R](spout: IRichSpout): Streamlet[R]
}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
index 0f012e4..6020320 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
@@ -18,6 +18,7 @@
*/
package org.apache.heron.streamlet.scala.impl
+import org.apache.heron.api.spout.IRichSpout
import org.apache.heron.api.topology.TopologyBuilder
import org.apache.heron.streamlet.impl.{BuilderImpl => JavaBuilderImpl}
@@ -39,6 +40,11 @@
StreamletImpl.fromJavaStreamlet[R](newJavaStreamlet)
}
+ override def newSource[R](spout: IRichSpout): Streamlet[R] = {
+ val newJavaStreamlet = builder.newSource[R](spout)
+ StreamletImpl.fromJavaStreamlet[R](newJavaStreamlet)
+ }
+
def build(): TopologyBuilder =
builder.asInstanceOf[JavaBuilderImpl].build()
diff --git a/heron/api/tests/java/org/apache/heron/resource/TestSpout.java b/heron/api/tests/java/org/apache/heron/resource/TestSpout.java
new file mode 100644
index 0000000..49025bd
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/resource/TestSpout.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+
+/**
+ * A Spout used for unit test, it will:
+ * 1. It will emit EMIT_COUNT of tuples with MESSAGE_ID.
+ * 2. The tuples are declared by outputFieldsDeclarer in fields "word"
+ */
+
+@Ignore
+public class TestSpout extends BaseRichSpout {
+ private static final int EMIT_COUNT = 10;
+ private static final String MESSAGE_ID = "MESSAGE_ID";
+
+ private final String[] toSend = new String[]{"A", "B"};
+ private SpoutOutputCollector outputCollector;
+ private int emitted = 0;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public void open(
+ Map<String, Object> map,
+ TopologyContext topologyContext,
+ SpoutOutputCollector spoutOutputCollector) {
+ this.outputCollector = spoutOutputCollector;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void nextTuple() {
+ // It will emit A, B, A, B, A, B, A, B, A, B
+ if (emitted < EMIT_COUNT) {
+ String word = toSend[emitted % toSend.length];
+ emit(outputCollector, new Values(word), MESSAGE_ID, emitted++);
+ }
+ }
+
+ protected void emit(SpoutOutputCollector collector,
+ List<Object> tuple, Object messageId, int emittedCount) {
+ collector.emit(tuple, messageId);
+ }
+}
+
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 8a0b22d..a9b15bd 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -32,6 +32,7 @@
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.resource.TestBasicBolt;
import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestSpout;
import org.apache.heron.resource.TestWindowBolt;
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Context;
@@ -49,6 +50,7 @@
import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -86,6 +88,13 @@
}
@Test
+ public void testSpoutStreamlet() throws Exception {
+ TestSpout spout = new TestSpout();
+ Streamlet<Double> streamlet = StreamletImpl.createSpoutStreamlet(spout);
+ assertTrue(streamlet instanceof SpoutStreamlet);
+ }
+
+ @Test
@SuppressWarnings("unchecked")
public void testMapStreamlet() throws Exception {
Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala b/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala
new file mode 100644
index 0000000..e0b7e08
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+
+import java.util.{List => JList}
+import java.util.{Map => JMap}
+
+import org.junit.Ignore
+
+import org.apache.heron.api.spout.BaseRichSpout
+import org.apache.heron.api.spout.SpoutOutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Values
+
+/**
+ * A Spout used for unit test, it will:
+ * 1. It will emit EMIT_COUNT of tuples with MESSAGE_ID.
+ * 2. The tuples are declared by outputFieldsDeclarer in fields "word"
+ */
+
+@Ignore
+class TestSpout extends BaseRichSpout {
+ private val EMIT_COUNT = 10
+ private val MESSAGE_ID = "MESSAGE_ID"
+
+ private val toSend = Array("A", "B")
+ private var outputCollector: SpoutOutputCollector = _
+ private var emitted = 0
+
+ override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer ): Unit = {
+ outputFieldsDeclarer.declare(new Fields("word"))
+ }
+
+ override def open(
+ conf: JMap[String, Object],
+ topologyContext: TopologyContext,
+ spoutOutputCollector: SpoutOutputCollector): Unit = {
+ this.outputCollector = spoutOutputCollector
+ }
+
+ override def close(): Unit = {
+ }
+
+ override def nextTuple(): Unit = {
+ // It will emit A, B, A, B, A, B, A, B, A, B
+ if (emitted < EMIT_COUNT) {
+ val word = toSend(emitted % toSend.length)
+ emit(outputCollector, new Values(word), MESSAGE_ID, emitted)
+ emitted = emitted + 1
+ }
+ }
+
+ def emit(collector: SpoutOutputCollector,
+ tuple: JList[Object],
+ messageId: Object,
+ emittedCount: Int): Unit = {
+ collector.emit(tuple, messageId)
+ }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
index cc04caa..3d8048b 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
@@ -18,12 +18,14 @@
*/
package org.apache.heron.streamlet.scala.impl
+import java.util.{Map => JMap}
+import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.junit.Assert.assertEquals
+import org.apache.heron.resource.TestSpout
import org.apache.heron.streamlet.Context
-
import org.apache.heron.streamlet.scala.{Builder, Streamlet, Source}
import org.apache.heron.streamlet.scala.common.BaseFunSuite
@@ -69,4 +71,16 @@
override def cleanup(): Unit = numbers.clear()
}
+ test(
+ "BuilderImpl should support streamlet generation from a user defined spout") {
+ val spout = new TestSpout
+ val spoutStreamletObj = Builder.newBuilder
+ .newSource(spout)
+ .setName("Spout_Streamlet_1")
+ .setNumPartitions(20)
+
+ assert(spoutStreamletObj.isInstanceOf[Streamlet[_]])
+ assertEquals("Spout_Streamlet_1", spoutStreamletObj.getName)
+ assertEquals(20, spoutStreamletObj.getNumPartitions)
+ }
}