[GEARPUMP-377] Add TwitterSource and examples

Author: manuzhang <owenzhang1990@gmail.com>

Closes #247 from manuzhang/twitter_source.
diff --git a/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala b/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala
new file mode 100644
index 0000000..0b8722e
--- /dev/null
+++ b/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.twitter
+
+import java.time.Duration
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindows}
+import org.apache.gearpump.streaming.twitter.TwitterSource
+import org.apache.gearpump.util.AkkaApp
+import twitter4j.conf.ConfigurationBuilder
+
+object TwitterExamples extends AkkaApp with ArgumentsParser {
+
+  val CONSUMER_KEY = "consumer-key"
+  val CONSUMER_SECRET = "consumer-secret"
+  val TOKEN = "token"
+  val TOKEN_SECRET = "token-secret"
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    CONSUMER_KEY -> CLIOption[String]("consumer key", required = true),
+    CONSUMER_SECRET -> CLIOption[String]("consumer secret", required = true),
+    TOKEN -> CLIOption[String]("token", required = true),
+    TOKEN_SECRET -> CLIOption[String]("token secret", required = true)
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    val twitterConf = new ConfigurationBuilder()
+      .setOAuthConsumerKey(config.getString(CONSUMER_KEY))
+      .setOAuthConsumerSecret(config.getString(CONSUMER_SECRET))
+      .setOAuthAccessToken(config.getString(TOKEN))
+      .setOAuthAccessTokenSecret(config.getString(TOKEN_SECRET))
+      .build()
+
+    val twitterSource = TwitterSource(twitterConf)
+
+    val context: ClientContext = ClientContext(akkaConf)
+    val app = StreamApp("TwitterExample", context)
+
+    app.source[String](twitterSource)
+      .flatMap(tweet => tweet.split("[\\s]+"))
+      .filter(_.startsWith("#"))
+      .map((_, 1))
+      .window(FixedWindows.apply(Duration.ofMinutes(1)).triggering(EventTimeTrigger))
+      .groupBy(_._1)
+      .sum
+      .sink(new LoggerSink)
+
+    context.submit(app).waitUntilFinish()
+    context.close()
+  }
+
+}
diff --git a/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala b/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala
new file mode 100644
index 0000000..9fe94ea
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.twitter
+
+import java.time.Instant
+import java.util.concurrent.LinkedBlockingQueue
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.twitter.TwitterSource.{Factory, MessageListener}
+import twitter4j._
+import twitter4j.conf.Configuration
+
+class TwitterSource private[twitter](
+    twitterFactory: Factory,
+    filterQuery: Option[FilterQuery],
+    statusListener: MessageListener
+) extends DataSource {
+
+  private var twitterStream: TwitterStream = _
+
+  /**
+   * Opens connection to data source
+   * invoked in onStart() method of [[org.apache.gearpump.streaming.source.DataSourceTask]]
+   *
+   * @param context   is the task context at runtime
+   * @param startTime is the start time of system
+   */
+  override def open(context: TaskContext, startTime: Instant): Unit = {
+
+    this.twitterStream = twitterFactory.getTwitterStream
+    this.twitterStream.addListener(statusListener)
+
+    filterQuery match {
+      case Some(query) =>
+        this.twitterStream.filter(query)
+      case None =>
+        this.twitterStream.sample()
+    }
+  }
+
+  /**
+   * Reads next message from data source and
+   * returns null if no message is available
+   *
+   * @return a [[org.apache.gearpump.Message]] or null
+   */
+  override def read(): Message = {
+    Option(statusListener.poll()).map(status =>
+      Message(status.getText, Instant.now())).orNull
+  }
+
+  /**
+   * Closes connection to data source.
+   * invoked in onStop() method of [[org.apache.gearpump.streaming.source.DataSourceTask]]
+   */
+  override def close(): Unit = {
+    if (twitterStream != null) {
+      twitterStream.shutdown()
+    }
+  }
+
+  /**
+   * Returns a watermark such that no timestamp earlier than the watermark should enter the system
+   * Watermark.MAX mark the end of source data
+   */
+  override def getWatermark: Instant = {
+    Instant.now()
+  }
+}
+
+object TwitterSource {
+
+  class MessageListener extends StatusListener with Serializable {
+
+    private val queue = new LinkedBlockingQueue[Status](100000)
+
+    def poll(): Status = {
+      queue.poll()
+    }
+
+    override def onStallWarning(warning: StallWarning): Unit = {}
+
+    override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
+
+    override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
+
+    override def onStatus(status: Status): Unit = {
+      queue.offer(status)
+    }
+
+    override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
+
+    override def onException(ex: Exception): Unit = {
+      throw ex
+    }
+  }
+
+  /**
+   * Wrapper around TwitterStreamFactory which is final class and
+   * can not be mocked
+   */
+  class Factory(factory: TwitterStreamFactory) extends Serializable {
+
+    def getTwitterStream: TwitterStream = {
+      factory.getInstance()
+    }
+  }
+
+  def apply(conf: Configuration): TwitterSource = {
+    new TwitterSource(new Factory(new TwitterStreamFactory(conf)),
+      None, new MessageListener)
+  }
+
+  def apply(conf: Configuration, query: FilterQuery): TwitterSource = {
+    new TwitterSource(new Factory(new TwitterStreamFactory(conf)),
+      Option(query), new MessageListener)
+  }
+}
diff --git a/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala b/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala
new file mode 100644
index 0000000..a7ac8fb
--- /dev/null
+++ b/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.twitter
+
+import java.time.Instant
+
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.twitter.TwitterSource.{Factory, MessageListener}
+import org.mockito.Mockito._
+import org.scalacheck.{Arbitrary, Gen}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.prop.PropertyChecks
+import twitter4j.{FilterQuery, TwitterStream}
+
+class TwitterSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  implicit val arbQuery: Arbitrary[Option[FilterQuery]] = Arbitrary {
+    Gen.oneOf(None, Some(new FilterQuery()))
+  }
+
+  property("TwitterSource should properly setup, poll message and teardown") {
+    forAll {
+      (query: Option[FilterQuery], startTime: Long) =>
+        val factory = mock[Factory]
+        val stream = mock[TwitterStream]
+        val listener = mock[MessageListener]
+
+        when(factory.getTwitterStream).thenReturn(stream)
+        val twitterSource = new TwitterSource(factory, query, listener)
+
+        twitterSource.open(MockUtil.mockTaskContext, Instant.ofEpochMilli(startTime))
+
+        verify(stream).addListener(listener)
+        query match {
+          case Some(q) =>
+            verify(stream).filter(q)
+          case None =>
+            verify(stream).sample()
+        }
+
+        twitterSource.read()
+        verify(listener).poll()
+
+        twitterSource.close()
+        verify(stream).shutdown()
+    }
+  }
+}
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index 47aa0c6..7cc8807 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -37,7 +37,8 @@
     wordcount,
     wordcountJava,
     example_hbase,
-    example_kudu
+    example_kudu,
+    example_twitter
   )
 
   /**
@@ -159,6 +160,12 @@
   ).dependsOn(core % "provided", streaming % "provided; test->test",
     external_hadoopfs, external_monoid, external_serializer, external_kafka)
 
+  lazy val example_twitter = Project(
+    id = "gearpump-examples-twitter",
+    base = file("examples/streaming/twitter"),
+    settings = exampleSettings("org.apache.gearpump.streaming.examples.twitter.TwitterExamples")
+  ).dependsOn(core % "provided", streaming % "provided; test->test", external_twitter)
+
   private def exampleSettings(className: String): Seq[Def.Setting[_]] =
     commonSettings ++ noPublish ++ myAssemblySettings ++ Seq(
       mainClass in(Compile, packageBin) :=
diff --git a/project/BuildExternals.scala b/project/BuildExternals.scala
index 698af6c..6c1289c 100644
--- a/project/BuildExternals.scala
+++ b/project/BuildExternals.scala
@@ -134,4 +134,16 @@
       ))
     .dependsOn(core % "provided", streaming % "test->test; provided")
     .disablePlugins(sbtassembly.AssemblyPlugin)
+
+  lazy val external_twitter = Project(
+    id = "gearpump-external-twitter",
+    base = file("external/twitter"),
+    settings = commonSettings ++ javadocSettings ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "org.twitter4j" % "twitter4j-stream" % "4.0.4"
+        )
+      ))
+    .dependsOn(core % "provided", streaming % "test->test; provided")
+    .disablePlugins(sbtassembly.AssemblyPlugin)
 }
\ No newline at end of file