[S2GRAPH-225] support custom udf class

JIRA:
  [S2GRAPH-225] https://issues.apache.org/jira/browse/S2GRAPH-225

Pull Request:
  Closes #185

Author
  Chul Kang <elric@apache.org>
diff --git a/CHANGES b/CHANGES
index df8fbf2..bdc3da4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -38,6 +38,7 @@
     * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
     * [S2GRAPH-201] - Provide S2GraphSource
     * [S2GRAPH-218] - add operations not supported on sql
+    * [S2GRAPH-225] - support custom udf class
 
 ** Bug
     * [S2GRAPH-159] - Wrong syntax at a bash script under Linux
diff --git a/project/Common.scala b/project/Common.scala
index 42628f0..1008b2c 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -30,7 +30,7 @@
   val hadoopVersion = "2.7.3"
   val tinkerpopVersion = "3.2.5"
 
-  val elastic4sVersion = "6.1.1"
+  val elastic4sVersion = "6.2.4"
 
   val KafkaVersion = "0.10.2.1"
 
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index f647040..47ec835 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -38,6 +38,7 @@
   "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
   "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion,
   "com.github.scopt" %% "scopt" % "3.7.0",
+  "io.thekraken" % "grok" % "0.1.5",
   "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test
 )
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 8f21bc2..026b688 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -35,7 +35,7 @@
 
       dfMap.put(source.conf.name, df)
     }
-    logger.debug(s"valid source DF set : ${dfMap.keySet}")
+    logger.info(s"valid source DF set : ${dfMap.keySet}")
 
     // process
     var processRst:Seq[(String, DataFrame)] = Nil
@@ -45,7 +45,7 @@
 
     } while(processRst.nonEmpty)
 
-    logger.debug(s"valid named DF set : ${dfMap.keySet}")
+    logger.info(s"valid named DF set : ${dfMap.keySet}")
 
     // sinks
     jobDesc.sinks.foreach { s =>
@@ -63,8 +63,7 @@
     val dfKeys = dfMap.keySet
 
     processes.filter{ p =>
-        var existAllInput = true
-        p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+        val existAllInput = p.conf.inputs.forall{ input => dfKeys(input) }
         !dfKeys(p.conf.name) && existAllInput
     }
     .map { p =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 9a529aa..0943056 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -21,28 +21,32 @@
 
 import play.api.libs.json.{JsValue, Json}
 import org.apache.s2graph.s2jobs.task._
+import org.apache.s2graph.s2jobs.udfs.UdfOption
 
 case class JobDescription(
                            name:String,
+                           udfs: Seq[UdfOption],
                            sources:Seq[Source],
                            processes:Seq[task.Process],
                            sinks:Seq[Sink]
                          )
 
 object JobDescription extends Logger {
-  val dummy = JobDescription("dummy", Nil, Nil, Nil)
+  val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil)
 
   def apply(jsVal:JsValue):JobDescription = {
     implicit val TaskConfReader = Json.reads[TaskConf]
+    implicit val UdfOptionReader = Json.reads[UdfOption]
 
     logger.debug(s"JobDescription: ${jsVal}")
 
     val jobName = (jsVal \ "name").as[String]
+    val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil)
     val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf))
     val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf))
     val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf))
 
-    JobDescription(jobName, sources, processes, sinks)
+    JobDescription(jobName, udfs, sources, processes, sinks)
   }
 
   def getSource(conf:TaskConf):Source = {
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
index a64a399..0a76274 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.s2jobs
 
+import org.apache.s2graph.s2jobs.udfs.Udf
 import org.apache.spark.sql.SparkSession
 import play.api.libs.json.{JsValue, Json}
 
@@ -82,6 +83,13 @@
       .enableHiveSupport()
       .getOrCreate()
 
+    // register udfs
+    jobDescription.udfs.foreach{ udfOption =>
+      val udf = Class.forName(udfOption.`class`).newInstance().asInstanceOf[Udf]
+      logger.info((s"[udf register] ${udfOption}"))
+      udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty))
+    }
+
     val job = new Job(ss, jobDescription)
     job.run()
   }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 4de585c..dd6f41b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,22 +20,17 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{GraphUtil, Management}
+import org.apache.s2graph.core.GraphUtil
 import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
 /**
   * Sink
@@ -132,6 +127,8 @@
     logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
 
     conf.options.getOrElse("format", "json") match {
+      case "raw" =>
+        df
       case "tsv" =>
         val delimiter = conf.options.getOrElse("delimiter", "\t")
 
@@ -212,9 +209,10 @@
   * @param conf
   */
 class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
-  import scala.collection.JavaConversions._
-  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
   import org.apache.s2graph.core.S2GraphConfigs._
+  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+  import scala.collection.JavaConversions._
 
   override def mandatoryOptions: Set[String] = Set()
 
@@ -261,6 +259,10 @@
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
+    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+    import scala.collection.JavaConversions._
+
     // TODO: FIX THIS. overwrite local cache config.
     val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
     val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
new file mode 100644
index 0000000..ebcb41d
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
@@ -0,0 +1,36 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.utils.GrokHelper
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{DataType, StructType}
+import play.api.libs.json.{JsValue, Json}
+
+class Grok extends Udf {
+  import org.apache.spark.sql.functions.udf
+
+  def register(ss: SparkSession, name:String, options:Map[String, String]) = {
+    // grok
+    val patternDir = options.getOrElse("patternDir", "/tmp")
+    val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq
+    val patterns = Json.parse(options.getOrElse("patterns", "{}")).asOpt[Map[String, String]].getOrElse(Map.empty)
+    val compilePattern = options("compilePattern")
+    val schemaOpt = options.get("schema")
+
+    patternFiles.foreach { patternFile =>
+      ss.sparkContext.addFile(s"${patternDir}/${patternFile}")
+    }
+
+    implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, compilePattern)
+
+    val f = if(schemaOpt.isDefined) {
+      val schema = DataType.fromJson(schemaOpt.get)
+      implicit val keys:Array[String] = schema.asInstanceOf[StructType].fieldNames
+      udf(GrokHelper.grokMatchWithSchema _, schema)
+    } else {
+      udf(GrokHelper.grokMatch _)
+    }
+
+
+    ss.udf.register(name, f)
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
new file mode 100644
index 0000000..821527c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
@@ -0,0 +1,14 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.sql.SparkSession
+
+case class UdfOption(name:String, `class`:String, params:Option[Map[String, String]] = None)
+trait Udf extends Serializable with Logger {
+  def register(ss: SparkSession, name:String, options:Map[String, String])
+}
+
+
+
+
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
new file mode 100644
index 0000000..37485c8
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
@@ -0,0 +1,59 @@
+package org.apache.s2graph.s2jobs.utils
+
+import io.thekraken.grok.api.Grok
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.Row
+
+import scala.collection.mutable
+
+object GrokHelper extends Logger {
+  private val grokPool:mutable.Map[String, Grok] = mutable.Map.empty
+
+  def getGrok(name:String, patternFiles:Seq[String], patterns:Map[String, String], compilePattern:String):Grok = {
+    if (grokPool.get(name).isEmpty) {
+      println(s"Grok '$name' initialized..")
+      val grok = new Grok()
+      patternFiles.foreach { patternFile =>
+        val filePath = SparkFiles.get(patternFile)
+        println(s"[Grok][$name] add pattern file : $patternFile  ($filePath)")
+        grok.addPatternFromFile(filePath)
+      }
+      patterns.foreach { case (name, pattern) =>
+        println(s"[Grok][$name] add pattern : $name ($pattern)")
+        grok.addPattern(name, pattern)
+      }
+
+      grok.compile(compilePattern)
+      println(s"[Grok][$name] patterns: ${grok.getPatterns}")
+      grokPool.put(name, grok)
+    }
+
+    grokPool(name)
+  }
+
+  def grokMatch(text:String)(implicit grok:Grok):Option[Map[String, String]] = {
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+    val rstMap = m.toMap.asScala.toMap
+      .filter(_._2 != null)
+      .map{ case (k, v) =>  k -> v.toString}
+    if (rstMap.isEmpty) None else Some(rstMap)
+  }
+
+  def grokMatchWithSchema(text:String)(implicit grok:Grok, keys:Array[String]):Option[Row] = {
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+
+    val rstMap = m.toMap.asScala.toMap
+    if (rstMap.isEmpty) None
+    else {
+      val l = keys.map { key => rstMap.getOrElse(key, null)}
+      Some(Row.fromSeq(l))
+    }
+  }
+}