GEARPUMP-376 Apache Kudu Akka Streaming Sink

Author: Sandish Kumar <sany@phdata.io>

Closes #246 from SandishKumarHN/master.
diff --git a/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/KuduConn.scala b/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/KuduConn.scala
new file mode 100644
index 0000000..a240974
--- /dev/null
+++ b/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/KuduConn.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kudu
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.external.kudu.KuduSink
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+import org.slf4j.Logger
+
+object KuduConn extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  val RUN_FOR_EVER = -1
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "splitNum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
+    "sinkNum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
+  )
+
+  def application(config: ParseResult, system: ActorSystem): StreamApplication = {
+    implicit val actorSystem = system
+    val splitNum = config.getInt("splitNum")
+    val sinkNum = config.getInt("sinkNum")
+
+    val map = Map[String, String]("KUDUSINK" -> "kudusink", "kudu.masters" -> "kuduserver",
+      "KUDU_USER" -> "kudu.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal",
+      "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file", "TABLE_NAME" -> "kudu.table.name"
+    )
+
+    val userConfig = new UserConfig(map)
+    val split = new Split
+    val sourceProcessor = DataSourceProcessor(split, splitNum, "Split")
+    val sink = KuduSink(userConfig, "impala::default.kudu_1")
+    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+    val partitioner = new HashPartitioner
+    val computation = sourceProcessor ~ partitioner ~> sinkProcessor
+    val application = StreamApplication("Kudu", Graph(computation), userConfig)
+
+    application
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config, context.system))
+    context.close()
+  }
+}
diff --git a/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/KuduConnDSL.scala b/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/KuduConnDSL.scala
new file mode 100644
index 0000000..d8af51a
--- /dev/null
+++ b/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/KuduConnDSL.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kudu
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.external.kudu.KuduSink
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.util.AkkaApp
+
+object KuduConnDSL extends AkkaApp with ArgumentsParser {
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val map = Map[String, String]("KUDUSINK" -> "kudusink", "kudu.masters" -> "localhost",
+      "KUDU_USER" -> "kudu.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal",
+      "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file", "TABLE_NAME" -> "kudu.table.name"
+    )
+
+    val userConfig = new UserConfig(map)
+    val appName = "KuduDSL"
+    val context = ClientContext(akkaConf)
+    val app = StreamApp(appName, context)
+
+    app.source(new Split).sink(new KuduSink(userConfig, "impala::default.kudu_1"), 1,
+      userConfig, "KuduSink" )
+
+    context.submit(app)
+    context.close()
+  }
+}
diff --git a/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/Split.scala b/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/Split.scala
new file mode 100644
index 0000000..ca9778a
--- /dev/null
+++ b/examples/streaming/kudu/src/main/scala/org/apache/gearpump/streaming/examples/kudu/Split.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kudu
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+
+class Split extends DataSource {
+
+  private var x: Long = 0
+
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+  override def read(): Message = {
+
+    val tuple = ("column1" -> s"value$x", "column2" -> s"value2$x")
+    x+=1
+
+    Message(tuple)
+  }
+
+  override def close(): Unit = {}
+
+  override def getWatermark: Instant = Instant.now()
+
+}
diff --git a/external/kudu/README.md b/external/kudu/README.md
new file mode 100644
index 0000000..0cf736b
--- /dev/null
+++ b/external/kudu/README.md
@@ -0,0 +1,40 @@
+#Gearpump Kudu
+
+Gearpump integration for [Apache Kudu](https://kudu.apache.org)
+
+## Usage
+
+The message type that KuduSink is able to handle including:
+
+ 1. Map[String, String] which means (columnName, columnValue)
+  
+Suppose there is a DataSource Task will output above-mentioned messages, you can write a simple application then:
+
+```scala
+val sink = new KuduSink(UserConfig.empty, "$tableName")
+val sinkProcessor = DataSinkProcessor(sink, "$sinkNum")
+val split = Processor[DataSource]("$splitNum")
+val computation = split ~> sinkProcessor
+val application = StreamApplication("Kudu", Graph(computation), UserConfig.empty)
+```
+
+## Launch the application
+
+The Kudu cluster should run on where Gearpump is deployed.
+Suppose Kdudu is installed at ```/usr/lib/kudu``` on every node and you already have your application built into a jar file. 
+Please note only client side's configuration change is needed. After that, you are able to submit the application.
+
+
+## If you need to supply the Kudu cluster details for the connection
+
+```scala
+
+    val map = Map[String, String]("KUDUSINK" -> "kudusink", "kudu.masters"->"kuduserver",
+      "KUDU_USER" -> "kudu.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal",
+      "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file", "TABLE_NAME" -> "kudu.table.name"
+    )
+```
+
+## Working with Kerberized Kudu
+
+Before running job make sure you run kinit, with just a kinit you should be able to run the job and insert records into Kudu table
\ No newline at end of file
diff --git a/external/kudu/src/main/scala/org/apache/gearpump/external/kudu/KuduSink.scala b/external/kudu/src/main/scala/org/apache/gearpump/external/kudu/KuduSink.scala
new file mode 100644
index 0000000..e74edb3
--- /dev/null
+++ b/external/kudu/src/main/scala/org/apache/gearpump/external/kudu/KuduSink.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.external.kudu
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.external.kudu.KuduSink.KuduWriterFactory
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.kudu.Type._
+import org.apache.kudu.client._
+
+class KuduSink private[kudu](userConfig: UserConfig, tableName: String, factory: KuduWriterFactory)
+  extends DataSink {
+
+  private lazy val kuduWriter = factory.getKuduWriter(userConfig, tableName)
+
+  def this(userConfig: UserConfig, tableName: String) = {
+    this(userConfig, tableName, new KuduWriterFactory)
+  }
+
+  override def open(context: TaskContext): Unit = {}
+
+  override def write(message: Message): Unit = {
+    kuduWriter.put(message.value)
+
+  }
+
+  override def close(): Unit = {
+    kuduWriter.close()
+  }
+
+}
+
+object KuduSink {
+  val KUDUSINK = "kudusink"
+  val TABLE_NAME = "kudu.table.name"
+  val KUDU_MASTERS = "kudu.masters"
+  val KUDU_USER = "kudu.user"
+
+  def apply[T](userConfig: UserConfig, tableName: String): KuduSink = {
+    new KuduSink(userConfig, tableName)
+  }
+
+  class KuduWriterFactory extends java.io.Serializable {
+    def getKuduWriter(userConfig: UserConfig, tableName: String): KuduWriter = {
+      new KuduWriter(userConfig, tableName)
+    }
+  }
+
+  class KuduWriter(kuduClient: KuduClient, tableName: String) {
+
+    private val table: KuduTable = kuduClient.openTable(tableName)
+
+    private lazy val session = kuduClient.newSession()
+
+    def this(userConfig: UserConfig, tableName: String) = {
+      this(new KuduClient.KuduClientBuilder(userConfig.getString(KUDU_MASTERS).get).build(),
+        tableName)
+    }
+
+    def put(msg: Any): Unit = {
+      val insert = table.newUpsert()
+      var partialRow = insert.getRow
+      msg match {
+        case tuple: Product =>
+          for (column <- tuple.productIterator) {
+            column match {
+              case (_, _) =>
+                val columnName: String = column.asInstanceOf[(_, _)]._1.toString
+                val colValue: String = column.asInstanceOf[(_, _)]._2.toString
+                val col = table.getSchema.getColumn (columnName)
+                col.getType match {
+                  case INT8 => partialRow.addByte(columnName, colValue.toByte)
+                  case INT16 => partialRow.addShort(columnName, colValue.toShort)
+                  case INT32 => partialRow.addInt(columnName, colValue.toInt)
+                  case INT64 => partialRow.addLong(columnName, colValue.toLong)
+                  case STRING => partialRow.addString(columnName, colValue)
+                  case BOOL => partialRow.addBoolean(columnName, colValue.toBoolean)
+                  case FLOAT => partialRow.addFloat(columnName, colValue.toFloat)
+                  case DOUBLE => partialRow.addDouble(columnName, colValue.toDouble)
+                  case BINARY => partialRow.addByte(columnName, colValue.toByte)
+                  case _ => throw new UnsupportedOperationException(s"Unknown type ${col.getType}")
+                }
+              case _ => throw new UnsupportedOperationException(s"Unknown input format")
+            }
+          }
+          session.apply(insert)
+        case _ => throw new UnsupportedOperationException(s"Unknown input format")
+      }
+    }
+
+    def close(): Unit = {
+      session.close()
+      kuduClient.close()
+    }
+  }
+}
\ No newline at end of file
diff --git a/external/kudu/src/main/scala/org/apache/gearpump/external/kudu/dsl/KuduDSLSink.scala b/external/kudu/src/main/scala/org/apache/gearpump/external/kudu/dsl/KuduDSLSink.scala
new file mode 100644
index 0000000..4e935dc
--- /dev/null
+++ b/external/kudu/src/main/scala/org/apache/gearpump/external/kudu/dsl/KuduDSLSink.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.external.kudu.dsl
+
+
+import scala.language.implicitConversions
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.external.kudu.KuduSink
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
+
+/** Create a Kudu DSL Sink */
+object KuduDSLSink {
+  implicit def streamToHBaseDSLSink[T](stream: Stream[T]): KuduDSLSink[T] = {
+    new KuduDSLSink[T](stream)
+  }
+}
+
+class KuduDSLSink[T](stream: Stream[T]) {
+
+  def writeToKudu(userConfig: UserConfig, table: String, parallelism: Int, description: String)
+  : Stream[T] = {
+    stream.sink(KuduSink[T](userConfig, table), parallelism, userConfig, description)
+  }
+
+}
+
diff --git a/external/kudu/src/test/scala/org/apache/gearpump/external/kudu/KuduSinkSpec.scala b/external/kudu/src/test/scala/org/apache/gearpump/external/kudu/KuduSinkSpec.scala
new file mode 100644
index 0000000..705fef8
--- /dev/null
+++ b/external/kudu/src/test/scala/org/apache/gearpump/external/kudu/KuduSinkSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.external.kudu
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.external.kudu.KuduSink.{KuduWriter, KuduWriterFactory}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.kudu.client._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class KuduSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+
+  property("KuduSink should invoke KuduWriter for writing message to Kudu") {
+
+    val kuduWriter = mock[KuduWriter]
+    val kuduWriterFactory = mock[KuduWriterFactory]
+
+    implicit val system: ActorSystem = MockUtil.system
+
+    val userConfig = UserConfig.empty
+    val tableName = "kudu"
+
+    when(kuduWriterFactory.getKuduWriter(userConfig, tableName))
+      .thenReturn(kuduWriter)
+
+    val kuduSink = new KuduSink(userConfig, tableName, kuduWriterFactory)
+
+    kuduSink.open(MockUtil.mockTaskContext)
+
+    val value = ("key", "value")
+    val message = Message(value)
+    kuduSink.write(message)
+    verify(kuduWriter, atLeastOnce()).put(message.value)
+
+    kuduSink.close()
+    verify(kuduWriter).close()
+  }
+
+  property("KuduWriter should insert a row successfully") {
+
+    val table = mock[KuduTable]
+    val kuduClient = mock[KuduClient]
+    val taskContext = mock[TaskContext]
+
+    val map = Map[String, String]("KUDUSINK" -> "kudusink", "TABLE_NAME" -> "kudu.table.name",
+      "COLUMN_FAMILY" -> "kudu.table.column.family", "COLUMN_NAME" -> "kudu.table.column.name",
+      "KUDU_USER" -> "kudu.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal",
+      "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file"
+    )
+    val userConfig = new UserConfig(map)
+    val tableName = "kudu"
+    val key = "key"
+    val value = "value"
+
+    when(kuduClient.openTable(tableName)).thenReturn(table)
+  }
+}
\ No newline at end of file
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index b3a8e4a..47aa0c6 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -36,7 +36,8 @@
     sol,
     wordcount,
     wordcountJava,
-    example_hbase
+    example_hbase,
+    example_kudu
   )
 
   /**
@@ -121,6 +122,12 @@
       )
   ) dependsOn(core % "provided", streaming % "provided; test->test", external_hbase)
 
+  lazy val example_kudu = Project(
+    id = "gearpump-examples-kudu",
+    base = file("examples/streaming/kudu"),
+    settings = exampleSettings("org.apache.gearpump.streaming.examples.kudu.KuduConn")
+  ) dependsOn(core % "provided", streaming % "provided; test->test", external_kudu)
+
   lazy val fsio = Project(
     id = "gearpump-examples-fsio",
     base = file("examples/streaming/fsio"),
diff --git a/project/BuildExternals.scala b/project/BuildExternals.scala
index 48715cb..698af6c 100644
--- a/project/BuildExternals.scala
+++ b/project/BuildExternals.scala
@@ -27,6 +27,7 @@
   lazy val externals: Seq[ProjectReference] = Seq(
     external_hbase,
     external_kafka,
+    external_kudu,
     external_monoid,
     external_hadoopfs
   )
@@ -45,6 +46,18 @@
     .dependsOn(core % "provided", streaming % "test->test; provided")
     .disablePlugins(sbtassembly.AssemblyPlugin)
 
+  lazy val external_kudu = Project(
+    id = "gearpump-external-kudu",
+    base = file("external/kudu"),
+    settings = commonSettings ++ javadocSettings  ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "org.apache.kudu" % "kudu-client" % kuduVersion
+        )
+      ))
+    .dependsOn(core % "provided", streaming % "test->test; provided")
+    .disablePlugins(sbtassembly.AssemblyPlugin)
+
   lazy val external_hbase = Project(
     id = "gearpump-external-hbase",
     base = file("external/hbase"),
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index b146c08..06d2781 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -35,6 +35,7 @@
   val upickleVersion = "0.3.4"
   val junitVersion = "4.12"
   val kafkaVersion = "0.8.2.1"
+  val kuduVersion = "1.7.0"
   val jsonSimpleVersion = "1.1"
   val storm09Version = "0.9.6"
   val stormVersion = "0.10.0"