Nwang/add spout based source streamlet (#3032)

* Add spout based Streamlet source
diff --git a/examples/src/scala/BUILD b/examples/src/scala/BUILD
index d03d9a8..786ac2a 100644
--- a/examples/src/scala/BUILD
+++ b/examples/src/scala/BUILD
@@ -8,6 +8,7 @@
     deps = [
         "@org_apache_commons_commons_lang3//jar",
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/api/src/scala:api-scala",
         "//third_party/java:kryo"
     ],
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Builder.java b/heron/api/src/java/org/apache/heron/streamlet/Builder.java
index 8e7c6e7..d6e4f70 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Builder.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Builder.java
@@ -20,6 +20,7 @@
 
 package org.apache.heron.streamlet;
 
+import org.apache.heron.api.spout.IRichSpout;
 import org.apache.heron.streamlet.impl.BuilderImpl;
 
 /**
@@ -45,4 +46,12 @@
    * @return
    */
   <R> Streamlet<R> newSource(Source<R> generator);
+
+  /**
+   * Creates a new Streamlet using the provided spout
+   * @param spout The spout that emits the tuples of the streamlet
+   * @param <R>
+   * @return
+   */
+  <R> Streamlet<R> newSource(IRichSpout spout);
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 0eb06d9..61715e8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Set;
 
+import org.apache.heron.api.spout.IRichSpout;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.Builder;
 import org.apache.heron.streamlet.SerializableSupplier;
@@ -47,7 +48,6 @@
   public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
     StreamletUtils.require(supplier != null, "supplier must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
-    retval.setNumPartitions(1);
     sources.add(retval);
     return retval;
   }
@@ -56,7 +56,14 @@
   public <R> Streamlet<R> newSource(Source<R> generator) {
     StreamletUtils.require(generator != null, "source must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createGeneratorStreamlet(generator);
-    retval.setNumPartitions(1);
+    sources.add(retval);
+    return retval;
+  }
+
+  @Override
+  public <R> Streamlet<R> newSource(IRichSpout spout) {
+    StreamletUtils.require(spout != null, "spout must not be null.");
+    StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
     sources.add(retval);
     return retval;
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 066dc74..a05040b 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -26,6 +26,7 @@
 import java.util.Set;
 import java.util.logging.Logger;
 
+import org.apache.heron.api.spout.IRichSpout;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.IStreamletOperator;
 import org.apache.heron.streamlet.JoinType;
@@ -54,6 +55,7 @@
 import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -115,6 +117,7 @@
     REMAP("remap"),
     SINK("sink"),
     SOURCE("generator"),
+    SPOUT("spout"),
     SUPPLIER("supplier"),
     TRANSFORM("transform"),
     UNION("union");
@@ -264,6 +267,14 @@
   }
 
   /**
+   * Create a Streamlet based on a Spout object
+   * @param spout The Spout function to generate the elements
+   */
+  static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
+    return new SpoutStreamlet<T>(spout);
+  }
+
+  /**
    * Return a new Streamlet by applying mapFn to each element of this Streamlet
    * @param mapFn The Map Function that should be applied to each element
   */
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java
new file mode 100644
index 0000000..5bb12cf
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * SpoutStreamlet is a quick way of creating a Streamlet
+ * from an user supplied Spout object. The spout is the
+ * source of all tuples for this Streamlet.
+ */
+public class SpoutStreamlet<R> extends StreamletImpl<R> {
+  private IRichSpout spout;
+
+  public SpoutStreamlet(IRichSpout spout) {
+    this.spout = spout;
+    setNumPartitions(1);
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.SPOUT, stageNames);
+    bldr.setSpout(getName(), spout, getNumPartitions());
+    return true;
+  }
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
index 0e5f13e..b299f52 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.heron.streamlet.scala
 
+import org.apache.heron.api.spout.IRichSpout
 import org.apache.heron.streamlet.scala.impl.BuilderImpl
 
 /**
@@ -49,7 +50,14 @@
     * Creates a new Streamlet using the underlying generator
     *
     * @param generator The generator that generates the tuples of the streamlet
-    * @return  a Streamlet representation of the source object
+    * @return a Streamlet representation of the source object
     */
   def newSource[R](generator: Source[R]): Streamlet[R]
+
+  /**
+   * Creates a new Streamlet using the provided spout
+   * @param spout The spout that emits the tuples of the streamlet
+   * @return a Streamlet representation of the spout object
+   */
+  def newSource[R](spout: IRichSpout): Streamlet[R]
 }
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
index 0f012e4..6020320 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.heron.streamlet.scala.impl
 
+import org.apache.heron.api.spout.IRichSpout
 import org.apache.heron.api.topology.TopologyBuilder
 import org.apache.heron.streamlet.impl.{BuilderImpl => JavaBuilderImpl}
 
@@ -39,6 +40,11 @@
     StreamletImpl.fromJavaStreamlet[R](newJavaStreamlet)
   }
 
+  override def newSource[R](spout: IRichSpout): Streamlet[R] = {
+    val newJavaStreamlet = builder.newSource[R](spout)
+    StreamletImpl.fromJavaStreamlet[R](newJavaStreamlet)
+  }
+
   def build(): TopologyBuilder =
     builder.asInstanceOf[JavaBuilderImpl].build()
 
diff --git a/heron/api/tests/java/org/apache/heron/resource/TestSpout.java b/heron/api/tests/java/org/apache/heron/resource/TestSpout.java
new file mode 100644
index 0000000..49025bd
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/resource/TestSpout.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+
+/**
+ * A Spout used for unit test, it will:
+ * 1. It will emit EMIT_COUNT of tuples with MESSAGE_ID.
+ * 2. The tuples are declared by outputFieldsDeclarer in fields "word"
+ */
+
+@Ignore
+public class TestSpout extends BaseRichSpout {
+  private static final int EMIT_COUNT = 10;
+  private static final String MESSAGE_ID = "MESSAGE_ID";
+
+  private final String[] toSend = new String[]{"A", "B"};
+  private SpoutOutputCollector outputCollector;
+  private int emitted = 0;
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("word"));
+  }
+
+  @Override
+  public void open(
+      Map<String, Object> map,
+      TopologyContext topologyContext,
+      SpoutOutputCollector spoutOutputCollector) {
+    this.outputCollector = spoutOutputCollector;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void nextTuple() {
+    // It will emit A, B, A, B, A, B, A, B, A, B
+    if (emitted < EMIT_COUNT) {
+      String word = toSend[emitted % toSend.length];
+      emit(outputCollector, new Values(word), MESSAGE_ID, emitted++);
+    }
+  }
+
+  protected void emit(SpoutOutputCollector collector,
+                      List<Object> tuple, Object messageId, int emittedCount) {
+    collector.emit(tuple, messageId);
+  }
+}
+
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 8a0b22d..a9b15bd 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -32,6 +32,7 @@
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.resource.TestBasicBolt;
 import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestSpout;
 import org.apache.heron.resource.TestWindowBolt;
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Context;
@@ -49,6 +50,7 @@
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -86,6 +88,13 @@
   }
 
   @Test
+  public void testSpoutStreamlet() throws Exception {
+    TestSpout spout = new TestSpout();
+    Streamlet<Double> streamlet = StreamletImpl.createSpoutStreamlet(spout);
+    assertTrue(streamlet instanceof SpoutStreamlet);
+  }
+
+  @Test
   @SuppressWarnings("unchecked")
   public void testMapStreamlet() throws Exception {
     Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala b/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala
new file mode 100644
index 0000000..e0b7e08
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+
+import java.util.{List => JList}
+import java.util.{Map => JMap}
+
+import org.junit.Ignore
+
+import org.apache.heron.api.spout.BaseRichSpout
+import org.apache.heron.api.spout.SpoutOutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Values
+
+/**
+ * A Spout used for unit test, it will:
+ * 1. It will emit EMIT_COUNT of tuples with MESSAGE_ID.
+ * 2. The tuples are declared by outputFieldsDeclarer in fields "word"
+ */
+
+@Ignore
+class TestSpout extends BaseRichSpout {
+  private val EMIT_COUNT = 10
+  private val MESSAGE_ID = "MESSAGE_ID"
+
+  private val toSend = Array("A", "B")
+  private var outputCollector: SpoutOutputCollector = _
+  private var emitted = 0
+
+  override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer ): Unit = {
+    outputFieldsDeclarer.declare(new Fields("word"))
+  }
+
+  override def open(
+      conf: JMap[String, Object],
+      topologyContext: TopologyContext,
+      spoutOutputCollector: SpoutOutputCollector): Unit = {
+    this.outputCollector = spoutOutputCollector
+  }
+
+  override def close(): Unit = {
+  }
+
+  override def nextTuple(): Unit = {
+    // It will emit A, B, A, B, A, B, A, B, A, B
+    if (emitted < EMIT_COUNT) {
+      val word = toSend(emitted % toSend.length)
+      emit(outputCollector, new Values(word), MESSAGE_ID, emitted)
+      emitted = emitted + 1
+    }
+  }
+
+  def emit(collector: SpoutOutputCollector,
+           tuple: JList[Object],
+           messageId: Object,
+           emittedCount: Int): Unit = {
+    collector.emit(tuple, messageId)
+  }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
index cc04caa..3d8048b 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
@@ -18,12 +18,14 @@
  */
 package org.apache.heron.streamlet.scala.impl
 
+import java.util.{Map => JMap}
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.junit.Assert.assertEquals
 
+import org.apache.heron.resource.TestSpout
 import org.apache.heron.streamlet.Context
-
 import org.apache.heron.streamlet.scala.{Builder, Streamlet, Source}
 import org.apache.heron.streamlet.scala.common.BaseFunSuite
 
@@ -69,4 +71,16 @@
     override def cleanup(): Unit = numbers.clear()
   }
 
+  test(
+    "BuilderImpl should support streamlet generation from a user defined spout") {
+    val spout = new TestSpout
+    val spoutStreamletObj = Builder.newBuilder
+      .newSource(spout)
+      .setName("Spout_Streamlet_1")
+      .setNumPartitions(20)
+
+    assert(spoutStreamletObj.isInstanceOf[Streamlet[_]])
+    assertEquals("Spout_Streamlet_1", spoutStreamletObj.getName)
+    assertEquals(20, spoutStreamletObj.getNumPartitions)
+  }
 }