[GEARPUMP-303] add a RabbitMQ sink to integrate with gearpump

Author: vinoyang <yanghua1127@gmail.com>

Closes #180 from yanghua/dev.
diff --git a/experiments/rabbitmq/README.md b/experiments/rabbitmq/README.md
new file mode 100644
index 0000000..9059608
--- /dev/null
+++ b/experiments/rabbitmq/README.md
@@ -0,0 +1,40 @@
+# Gearpump RabbitMQ
+
+Gearpump integration for [RabbitMQ](https://www.rabbitmq.com/)
+
+## Usage
+
+The message type that RMQSink is able to handle including:
+
+ 1. String
+ 2. Array[Byte]
+ 3. Sequence of type 1 and 2
+
+Suppose there is a DataSource Task will output above-mentioned messages, you can write a simple application then:
+
+```scala
+val sink = new RMQSink(UserConfig.empty)
+val sinkProcessor = DataSinkProcessor(sink, "$sinkNum")
+val split = Processor[DataSource]("$splitNum")
+val computation = split ~> sinkProcessor
+val application = StreamApplication("RabbitMQ", Graph(computation), UserConfig.empty)
+```
+## config items
+to initialize the RMQSink's instance, we need a UserConfig object and should provide some config item list below :
+
+* [must]`rabbitmq.queue.name` : the RabbitMQ queue name we want to sink the message to;
+* [optional]`rabbitmq.connection.host` : the RabbitMQ server host;
+* [optional]`rabbitmq.connection.port` : the RabbitMQ server port, default port is **5672**;
+* [optional]`rabbitmq.connection.uri` : the connection uri, pattern is `amqp://userName:password@hostName:portNumber/virtualHost`
+* [optional]`rabbitmq.virtualhost` : the virtual-host which is a logic domain in RabbitMQ Server
+* [optional]`rabbitmq.auth.username` : the user name for authorization
+* [optional]`rabbitmq.auth.password` : the password for authorization
+* [optional]`rabbitmq.automatic.recovery` : if need automatic recovery set `true` otherwise set `false`
+* [optional]`rabbitmq.connection.timeout` : the connection's timeout
+* [optional]`rabbitmq.network.recovery.internal` : recovery internal
+* [optional]`rabbitmq.requested.heartbeat` : if need heartbeat set `true` otherwise set `false`
+* [optional]`rabbitmq.topology.recoveryenabled` : if need recovery set `true` otherwise set `false`
+* [optional]`rabbitmq.channel.max` : the maximum channel num
+* [optional]`rabbitmq.frame.max` : the maximum frame num
+
+more details : https://www.rabbitmq.com/admin-guide.html
\ No newline at end of file
diff --git a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
new file mode 100644
index 0000000..492fffe
--- /dev/null
+++ b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experimental.rabbitmq
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import com.rabbitmq.client.Channel
+import com.rabbitmq.client.{Connection, ConnectionFactory}
+import org.apache.gearpump.util.LogUtil
+
+class RMQSink(userConfig: UserConfig,
+    val connFactory: (UserConfig) => ConnectionFactory) extends DataSink{
+
+  private val LOG = LogUtil.getLogger(getClass)
+  var connectionFactory: ConnectionFactory = connFactory(userConfig)
+  var connection: Connection = null
+  var channel: Channel = null
+  var queueName: String = null
+
+  def this(userConfig: UserConfig) = {
+    this(userConfig, RMQSink.getConnectionFactory)
+  }
+
+  override def open(context: TaskContext): Unit = {
+    connection = connectionFactory.newConnection
+    channel = connection.createChannel
+    if (channel == null) {
+      throw new RuntimeException("None of RabbitMQ channels are available.")
+    }
+    setupQueue()
+  }
+
+  override def write(message: Message): Unit = {
+    publish(message.msg)
+  }
+
+  override def close(): Unit = {
+    channel.close()
+    connection.close()
+  }
+
+  protected def setupQueue(): Unit = {
+    val queue = RMQSink.getQueueName(userConfig)
+    if (queue.isEmpty) {
+      throw new RuntimeException("can not get a RabbitMQ queue name")
+    }
+
+    queueName = queue.get
+    channel.queueDeclare(queue.get, false, false, false, null)
+  }
+
+  def publish(msg: Any): Unit = {
+    msg match {
+      case seq: Seq[Any] =>
+        seq.foreach(publish)
+      case str: String => {
+        channel.basicPublish("", queueName, null, msg.asInstanceOf[String].getBytes)
+      }
+      case byteArray: Array[Byte] => {
+        channel.basicPublish("", queueName, null, byteArray)
+      }
+      case _ => {
+        LOG.warn("matched unsupported message!")
+      }
+    }
+  }
+
+}
+
+object RMQSink {
+
+  val RMQSINK = "rmqsink"
+  val QUEUE_NAME = "rabbitmq.queue.name"
+  val SERVER_HOST = "rabbitmq.connection.host"
+  val SERVER_PORT = "rabbitmq.connection.port"
+  val CONNECTION_URI = "rabbitmq.connection.uri"
+  val VIRTUAL_HOST = "rabbitmq.virtualhost"
+  val AUTH_USERNAME = "rabbitmq.auth.username"
+  val AUTH_PASSWORD = "rabbitmq.auth.password"
+  val AUTOMATIC_RECOVERY = "rabbitmq.automatic.recovery"
+  val CONNECTION_TIMEOUT = "rabbitmq.connection.timeout"
+  val NETWORK_RECOVERY_INTERVAL = "rabbitmq.network.recovery.interval"
+  val REQUESTED_HEARTBEAT = "rabbitmq.requested.heartbeat"
+  val TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.topology.recoveryenabled"
+  val REQUESTED_CHANNEL_MAX = "rabbitmq.channel.max"
+  val REQUESTED_FRAME_MAX = "rabbitmq.frame.max"
+
+  def getConnectionFactory(userConfig : UserConfig): ConnectionFactory = {
+    val factory : ConnectionFactory = new ConnectionFactory
+
+    val uri : Option[String] = userConfig.getString(CONNECTION_URI)
+    if (uri.nonEmpty) {
+      factory.setUri(uri.get)
+    } else {
+      val serverHost : Option[String] = userConfig.getString(SERVER_HOST)
+      val serverPort : Option[Int] = userConfig.getInt(SERVER_PORT)
+      if (!serverHost.nonEmpty) {
+        throw new RuntimeException("missed config key : " + SERVER_HOST)
+      }
+
+      if (!serverPort.nonEmpty) {
+        throw new RuntimeException("missed config key : " + SERVER_PORT)
+      }
+
+      factory.setHost(serverHost.get)
+      factory.setPort(serverPort.get)
+    }
+
+    val virtualHost : Option[String] = userConfig.getString(VIRTUAL_HOST)
+    if (virtualHost.nonEmpty) {
+      factory.setVirtualHost(virtualHost.get)
+    }
+
+    val authUserName : Option[String] = userConfig.getString(AUTH_USERNAME)
+    if (authUserName.nonEmpty) {
+      factory.setUsername(authUserName.get)
+    }
+
+    val authPassword : Option[String] = userConfig.getString(AUTH_PASSWORD)
+    if (authPassword.nonEmpty) {
+      factory.setPassword(authPassword.get)
+    }
+
+    val automaticRecovery : Option[Boolean] = userConfig.getBoolean(AUTOMATIC_RECOVERY)
+    if (automaticRecovery.nonEmpty) {
+      factory.setAutomaticRecoveryEnabled(automaticRecovery.get)
+    }
+
+    val connectionTimeOut : Option[Int] = userConfig.getInt(CONNECTION_TIMEOUT)
+    if (connectionTimeOut.nonEmpty) {
+      factory.setConnectionTimeout(connectionTimeOut.get)
+    }
+
+    val networkRecoveryInterval : Option[Int] = userConfig.getInt(NETWORK_RECOVERY_INTERVAL)
+    if (networkRecoveryInterval.nonEmpty) {
+      factory.setNetworkRecoveryInterval(networkRecoveryInterval.get)
+    }
+
+    val requestedHeartBeat : Option[Int] = userConfig.getInt(REQUESTED_HEARTBEAT)
+    if (requestedHeartBeat.nonEmpty) {
+      factory.setRequestedHeartbeat(requestedHeartBeat.get)
+    }
+
+    val topologyRecoveryEnabled : Option[Boolean] = userConfig.getBoolean(TOPOLOGY_RECOVERY_ENABLED)
+    if (topologyRecoveryEnabled.nonEmpty) {
+      factory.setTopologyRecoveryEnabled(topologyRecoveryEnabled.get)
+    }
+
+    val requestedChannelMax : Option[Int] = userConfig.getInt(REQUESTED_CHANNEL_MAX)
+    if (requestedChannelMax.nonEmpty) {
+      factory.setRequestedChannelMax(requestedChannelMax.get)
+    }
+
+    val requestedFrameMax : Option[Int] = userConfig.getInt(REQUESTED_FRAME_MAX)
+    if (requestedFrameMax.nonEmpty) {
+      factory.setRequestedFrameMax(requestedFrameMax.get)
+    }
+
+    factory
+  }
+
+  def getQueueName(userConfig: UserConfig): Option[String] = {
+    userConfig.getString(QUEUE_NAME)
+  }
+
+}
diff --git a/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala b/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala
new file mode 100644
index 0000000..337579e
--- /dev/null
+++ b/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experimental.rabbitmq
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class RabbitmqSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("RMQSink should insert a row successfully") {
+
+    val taskContext = mock[TaskContext]
+
+    val map = Map[String, String]("rabbitmq.queue.name" -> "test",
+    "rabbitmq.connection.host" -> "localhost",
+    "rabbitmq.connection.port" -> "5672")
+    val userConfig = new UserConfig(map)
+
+    val rmqSink = new RMQSink(userConfig)
+
+    assert(RMQSink.getQueueName(userConfig).get == "test")
+
+//    rmqSink.open(taskContext)
+
+//    var msg: String = "{ 'hello' : 'world' }"
+//    rmqSink.publish(msg)
+
+//    rmqSink.close()
+  }
+
+}
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index 2f491d3..92957a1 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -29,7 +29,8 @@
     cgroup,
     redis,
     storm,
-    yarn
+    yarn,
+    rabbitmq
   )
 
   lazy val yarn = Project(
@@ -118,4 +119,16 @@
     settings = commonSettings ++ noPublish)
     .dependsOn (core % "provided")
     .disablePlugins(sbtassembly.AssemblyPlugin)
+
+  lazy val rabbitmq = Project(
+    id = "gearpump-experimentals-rabbitmq",
+    base = file("experiments/rabbitmq"),
+    settings = commonSettings ++ noPublish ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.rabbitmq" % "amqp-client" % rabbitmqVersion
+        )
+      ))
+    .dependsOn(core % "provided", streaming % "test->test; provided")
+    .disablePlugins(sbtassembly.AssemblyPlugin)
 }
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 4e30d3f..aa4e52f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -53,6 +53,7 @@
   val algebirdVersion = "0.9.0"
   val chillVersion = "0.6.0"
   val jedisVersion = "2.9.0"
+  val rabbitmqVersion = "3.5.3"
 
   val coreDependencies = Seq(
     libraryDependencies ++= Seq(