[GEARPUMP-303] add a RabbitMQ sink to integrate with gearpump
Author: vinoyang <yanghua1127@gmail.com>
Closes #180 from yanghua/dev.
diff --git a/experiments/rabbitmq/README.md b/experiments/rabbitmq/README.md
new file mode 100644
index 0000000..9059608
--- /dev/null
+++ b/experiments/rabbitmq/README.md
@@ -0,0 +1,40 @@
+# Gearpump RabbitMQ
+
+Gearpump integration for [RabbitMQ](https://www.rabbitmq.com/)
+
+## Usage
+
+The message type that RMQSink is able to handle including:
+
+ 1. String
+ 2. Array[Byte]
+ 3. Sequence of type 1 and 2
+
+Suppose there is a DataSource Task will output above-mentioned messages, you can write a simple application then:
+
+```scala
+val sink = new RMQSink(UserConfig.empty)
+val sinkProcessor = DataSinkProcessor(sink, "$sinkNum")
+val split = Processor[DataSource]("$splitNum")
+val computation = split ~> sinkProcessor
+val application = StreamApplication("RabbitMQ", Graph(computation), UserConfig.empty)
+```
+## config items
+to initialize the RMQSink's instance, we need a UserConfig object and should provide some config item list below :
+
+* [must]`rabbitmq.queue.name` : the RabbitMQ queue name we want to sink the message to;
+* [optional]`rabbitmq.connection.host` : the RabbitMQ server host;
+* [optional]`rabbitmq.connection.port` : the RabbitMQ server port, default port is **5672**;
+* [optional]`rabbitmq.connection.uri` : the connection uri, pattern is `amqp://userName:password@hostName:portNumber/virtualHost`
+* [optional]`rabbitmq.virtualhost` : the virtual-host which is a logic domain in RabbitMQ Server
+* [optional]`rabbitmq.auth.username` : the user name for authorization
+* [optional]`rabbitmq.auth.password` : the password for authorization
+* [optional]`rabbitmq.automatic.recovery` : if need automatic recovery set `true` otherwise set `false`
+* [optional]`rabbitmq.connection.timeout` : the connection's timeout
+* [optional]`rabbitmq.network.recovery.internal` : recovery internal
+* [optional]`rabbitmq.requested.heartbeat` : if need heartbeat set `true` otherwise set `false`
+* [optional]`rabbitmq.topology.recoveryenabled` : if need recovery set `true` otherwise set `false`
+* [optional]`rabbitmq.channel.max` : the maximum channel num
+* [optional]`rabbitmq.frame.max` : the maximum frame num
+
+more details : https://www.rabbitmq.com/admin-guide.html
\ No newline at end of file
diff --git a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
new file mode 100644
index 0000000..492fffe
--- /dev/null
+++ b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experimental.rabbitmq
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import com.rabbitmq.client.Channel
+import com.rabbitmq.client.{Connection, ConnectionFactory}
+import org.apache.gearpump.util.LogUtil
+
+class RMQSink(userConfig: UserConfig,
+ val connFactory: (UserConfig) => ConnectionFactory) extends DataSink{
+
+ private val LOG = LogUtil.getLogger(getClass)
+ var connectionFactory: ConnectionFactory = connFactory(userConfig)
+ var connection: Connection = null
+ var channel: Channel = null
+ var queueName: String = null
+
+ def this(userConfig: UserConfig) = {
+ this(userConfig, RMQSink.getConnectionFactory)
+ }
+
+ override def open(context: TaskContext): Unit = {
+ connection = connectionFactory.newConnection
+ channel = connection.createChannel
+ if (channel == null) {
+ throw new RuntimeException("None of RabbitMQ channels are available.")
+ }
+ setupQueue()
+ }
+
+ override def write(message: Message): Unit = {
+ publish(message.msg)
+ }
+
+ override def close(): Unit = {
+ channel.close()
+ connection.close()
+ }
+
+ protected def setupQueue(): Unit = {
+ val queue = RMQSink.getQueueName(userConfig)
+ if (queue.isEmpty) {
+ throw new RuntimeException("can not get a RabbitMQ queue name")
+ }
+
+ queueName = queue.get
+ channel.queueDeclare(queue.get, false, false, false, null)
+ }
+
+ def publish(msg: Any): Unit = {
+ msg match {
+ case seq: Seq[Any] =>
+ seq.foreach(publish)
+ case str: String => {
+ channel.basicPublish("", queueName, null, msg.asInstanceOf[String].getBytes)
+ }
+ case byteArray: Array[Byte] => {
+ channel.basicPublish("", queueName, null, byteArray)
+ }
+ case _ => {
+ LOG.warn("matched unsupported message!")
+ }
+ }
+ }
+
+}
+
+object RMQSink {
+
+ val RMQSINK = "rmqsink"
+ val QUEUE_NAME = "rabbitmq.queue.name"
+ val SERVER_HOST = "rabbitmq.connection.host"
+ val SERVER_PORT = "rabbitmq.connection.port"
+ val CONNECTION_URI = "rabbitmq.connection.uri"
+ val VIRTUAL_HOST = "rabbitmq.virtualhost"
+ val AUTH_USERNAME = "rabbitmq.auth.username"
+ val AUTH_PASSWORD = "rabbitmq.auth.password"
+ val AUTOMATIC_RECOVERY = "rabbitmq.automatic.recovery"
+ val CONNECTION_TIMEOUT = "rabbitmq.connection.timeout"
+ val NETWORK_RECOVERY_INTERVAL = "rabbitmq.network.recovery.interval"
+ val REQUESTED_HEARTBEAT = "rabbitmq.requested.heartbeat"
+ val TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.topology.recoveryenabled"
+ val REQUESTED_CHANNEL_MAX = "rabbitmq.channel.max"
+ val REQUESTED_FRAME_MAX = "rabbitmq.frame.max"
+
+ def getConnectionFactory(userConfig : UserConfig): ConnectionFactory = {
+ val factory : ConnectionFactory = new ConnectionFactory
+
+ val uri : Option[String] = userConfig.getString(CONNECTION_URI)
+ if (uri.nonEmpty) {
+ factory.setUri(uri.get)
+ } else {
+ val serverHost : Option[String] = userConfig.getString(SERVER_HOST)
+ val serverPort : Option[Int] = userConfig.getInt(SERVER_PORT)
+ if (!serverHost.nonEmpty) {
+ throw new RuntimeException("missed config key : " + SERVER_HOST)
+ }
+
+ if (!serverPort.nonEmpty) {
+ throw new RuntimeException("missed config key : " + SERVER_PORT)
+ }
+
+ factory.setHost(serverHost.get)
+ factory.setPort(serverPort.get)
+ }
+
+ val virtualHost : Option[String] = userConfig.getString(VIRTUAL_HOST)
+ if (virtualHost.nonEmpty) {
+ factory.setVirtualHost(virtualHost.get)
+ }
+
+ val authUserName : Option[String] = userConfig.getString(AUTH_USERNAME)
+ if (authUserName.nonEmpty) {
+ factory.setUsername(authUserName.get)
+ }
+
+ val authPassword : Option[String] = userConfig.getString(AUTH_PASSWORD)
+ if (authPassword.nonEmpty) {
+ factory.setPassword(authPassword.get)
+ }
+
+ val automaticRecovery : Option[Boolean] = userConfig.getBoolean(AUTOMATIC_RECOVERY)
+ if (automaticRecovery.nonEmpty) {
+ factory.setAutomaticRecoveryEnabled(automaticRecovery.get)
+ }
+
+ val connectionTimeOut : Option[Int] = userConfig.getInt(CONNECTION_TIMEOUT)
+ if (connectionTimeOut.nonEmpty) {
+ factory.setConnectionTimeout(connectionTimeOut.get)
+ }
+
+ val networkRecoveryInterval : Option[Int] = userConfig.getInt(NETWORK_RECOVERY_INTERVAL)
+ if (networkRecoveryInterval.nonEmpty) {
+ factory.setNetworkRecoveryInterval(networkRecoveryInterval.get)
+ }
+
+ val requestedHeartBeat : Option[Int] = userConfig.getInt(REQUESTED_HEARTBEAT)
+ if (requestedHeartBeat.nonEmpty) {
+ factory.setRequestedHeartbeat(requestedHeartBeat.get)
+ }
+
+ val topologyRecoveryEnabled : Option[Boolean] = userConfig.getBoolean(TOPOLOGY_RECOVERY_ENABLED)
+ if (topologyRecoveryEnabled.nonEmpty) {
+ factory.setTopologyRecoveryEnabled(topologyRecoveryEnabled.get)
+ }
+
+ val requestedChannelMax : Option[Int] = userConfig.getInt(REQUESTED_CHANNEL_MAX)
+ if (requestedChannelMax.nonEmpty) {
+ factory.setRequestedChannelMax(requestedChannelMax.get)
+ }
+
+ val requestedFrameMax : Option[Int] = userConfig.getInt(REQUESTED_FRAME_MAX)
+ if (requestedFrameMax.nonEmpty) {
+ factory.setRequestedFrameMax(requestedFrameMax.get)
+ }
+
+ factory
+ }
+
+ def getQueueName(userConfig: UserConfig): Option[String] = {
+ userConfig.getString(QUEUE_NAME)
+ }
+
+}
diff --git a/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala b/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala
new file mode 100644
index 0000000..337579e
--- /dev/null
+++ b/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experimental.rabbitmq
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class RabbitmqSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("RMQSink should insert a row successfully") {
+
+ val taskContext = mock[TaskContext]
+
+ val map = Map[String, String]("rabbitmq.queue.name" -> "test",
+ "rabbitmq.connection.host" -> "localhost",
+ "rabbitmq.connection.port" -> "5672")
+ val userConfig = new UserConfig(map)
+
+ val rmqSink = new RMQSink(userConfig)
+
+ assert(RMQSink.getQueueName(userConfig).get == "test")
+
+// rmqSink.open(taskContext)
+
+// var msg: String = "{ 'hello' : 'world' }"
+// rmqSink.publish(msg)
+
+// rmqSink.close()
+ }
+
+}
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index 2f491d3..92957a1 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -29,7 +29,8 @@
cgroup,
redis,
storm,
- yarn
+ yarn,
+ rabbitmq
)
lazy val yarn = Project(
@@ -118,4 +119,16 @@
settings = commonSettings ++ noPublish)
.dependsOn (core % "provided")
.disablePlugins(sbtassembly.AssemblyPlugin)
+
+ lazy val rabbitmq = Project(
+ id = "gearpump-experimentals-rabbitmq",
+ base = file("experiments/rabbitmq"),
+ settings = commonSettings ++ noPublish ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "com.rabbitmq" % "amqp-client" % rabbitmqVersion
+ )
+ ))
+ .dependsOn(core % "provided", streaming % "test->test; provided")
+ .disablePlugins(sbtassembly.AssemblyPlugin)
}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 4e30d3f..aa4e52f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -53,6 +53,7 @@
val algebirdVersion = "0.9.0"
val chillVersion = "0.6.0"
val jedisVersion = "2.9.0"
+ val rabbitmqVersion = "3.5.3"
val coreDependencies = Seq(
libraryDependencies ++= Seq(