[S2GRAPH-225] support custom udf class
JIRA:
[S2GRAPH-225] https://issues.apache.org/jira/browse/S2GRAPH-225
Pull Request:
Closes #185
Author
Chul Kang <elric@apache.org>
diff --git a/CHANGES b/CHANGES
index df8fbf2..bdc3da4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -38,6 +38,7 @@
* [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
* [S2GRAPH-201] - Provide S2GraphSource
* [S2GRAPH-218] - add operations not supported on sql
+ * [S2GRAPH-225] - support custom udf class
** Bug
* [S2GRAPH-159] - Wrong syntax at a bash script under Linux
diff --git a/project/Common.scala b/project/Common.scala
index 42628f0..1008b2c 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -30,7 +30,7 @@
val hadoopVersion = "2.7.3"
val tinkerpopVersion = "3.2.5"
- val elastic4sVersion = "6.1.1"
+ val elastic4sVersion = "6.2.4"
val KafkaVersion = "0.10.2.1"
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index f647040..47ec835 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -38,6 +38,7 @@
"org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
"org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion,
"com.github.scopt" %% "scopt" % "3.7.0",
+ "io.thekraken" % "grok" % "0.1.5",
"com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test
)
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 8f21bc2..026b688 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -35,7 +35,7 @@
dfMap.put(source.conf.name, df)
}
- logger.debug(s"valid source DF set : ${dfMap.keySet}")
+ logger.info(s"valid source DF set : ${dfMap.keySet}")
// process
var processRst:Seq[(String, DataFrame)] = Nil
@@ -45,7 +45,7 @@
} while(processRst.nonEmpty)
- logger.debug(s"valid named DF set : ${dfMap.keySet}")
+ logger.info(s"valid named DF set : ${dfMap.keySet}")
// sinks
jobDesc.sinks.foreach { s =>
@@ -63,8 +63,7 @@
val dfKeys = dfMap.keySet
processes.filter{ p =>
- var existAllInput = true
- p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+ val existAllInput = p.conf.inputs.forall{ input => dfKeys(input) }
!dfKeys(p.conf.name) && existAllInput
}
.map { p =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 9a529aa..0943056 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -21,28 +21,32 @@
import play.api.libs.json.{JsValue, Json}
import org.apache.s2graph.s2jobs.task._
+import org.apache.s2graph.s2jobs.udfs.UdfOption
case class JobDescription(
name:String,
+ udfs: Seq[UdfOption],
sources:Seq[Source],
processes:Seq[task.Process],
sinks:Seq[Sink]
)
object JobDescription extends Logger {
- val dummy = JobDescription("dummy", Nil, Nil, Nil)
+ val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil)
def apply(jsVal:JsValue):JobDescription = {
implicit val TaskConfReader = Json.reads[TaskConf]
+ implicit val UdfOptionReader = Json.reads[UdfOption]
logger.debug(s"JobDescription: ${jsVal}")
val jobName = (jsVal \ "name").as[String]
+ val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil)
val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf))
val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf))
val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf))
- JobDescription(jobName, sources, processes, sinks)
+ JobDescription(jobName, udfs, sources, processes, sinks)
}
def getSource(conf:TaskConf):Source = {
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
index a64a399..0a76274 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
@@ -19,6 +19,7 @@
package org.apache.s2graph.s2jobs
+import org.apache.s2graph.s2jobs.udfs.Udf
import org.apache.spark.sql.SparkSession
import play.api.libs.json.{JsValue, Json}
@@ -82,6 +83,13 @@
.enableHiveSupport()
.getOrCreate()
+ // register udfs
+ jobDescription.udfs.foreach{ udfOption =>
+ val udf = Class.forName(udfOption.`class`).newInstance().asInstanceOf[Udf]
+ logger.info((s"[udf register] ${udfOption}"))
+ udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty))
+ }
+
val job = new Job(ss, jobDescription)
job.run()
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 4de585c..dd6f41b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,22 +20,17 @@
package org.apache.s2graph.s2jobs.task
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{GraphUtil, Management}
+import org.apache.s2graph.core.GraphUtil
import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.spark.sql.streaming.S2SinkContext
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
import org.elasticsearch.spark.sql.EsSparkSQL
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
/**
* Sink
@@ -132,6 +127,8 @@
logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
conf.options.getOrElse("format", "json") match {
+ case "raw" =>
+ df
case "tsv" =>
val delimiter = conf.options.getOrElse("delimiter", "\t")
@@ -212,9 +209,10 @@
* @param conf
*/
class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
- import scala.collection.JavaConversions._
- import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
import org.apache.s2graph.core.S2GraphConfigs._
+ import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+ import scala.collection.JavaConversions._
override def mandatoryOptions: Set[String] = Set()
@@ -261,6 +259,10 @@
}
private def writeBatchWithMutate(df:DataFrame):Unit = {
+ import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+ import scala.collection.JavaConversions._
+
// TODO: FIX THIS. overwrite local cache config.
val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
new file mode 100644
index 0000000..ebcb41d
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
@@ -0,0 +1,36 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.utils.GrokHelper
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{DataType, StructType}
+import play.api.libs.json.{JsValue, Json}
+
+class Grok extends Udf {
+ import org.apache.spark.sql.functions.udf
+
+ def register(ss: SparkSession, name:String, options:Map[String, String]) = {
+ // grok
+ val patternDir = options.getOrElse("patternDir", "/tmp")
+ val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq
+ val patterns = Json.parse(options.getOrElse("patterns", "{}")).asOpt[Map[String, String]].getOrElse(Map.empty)
+ val compilePattern = options("compilePattern")
+ val schemaOpt = options.get("schema")
+
+ patternFiles.foreach { patternFile =>
+ ss.sparkContext.addFile(s"${patternDir}/${patternFile}")
+ }
+
+ implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, compilePattern)
+
+ val f = if(schemaOpt.isDefined) {
+ val schema = DataType.fromJson(schemaOpt.get)
+ implicit val keys:Array[String] = schema.asInstanceOf[StructType].fieldNames
+ udf(GrokHelper.grokMatchWithSchema _, schema)
+ } else {
+ udf(GrokHelper.grokMatch _)
+ }
+
+
+ ss.udf.register(name, f)
+ }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
new file mode 100644
index 0000000..821527c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
@@ -0,0 +1,14 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.sql.SparkSession
+
+case class UdfOption(name:String, `class`:String, params:Option[Map[String, String]] = None)
+trait Udf extends Serializable with Logger {
+ def register(ss: SparkSession, name:String, options:Map[String, String])
+}
+
+
+
+
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
new file mode 100644
index 0000000..37485c8
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
@@ -0,0 +1,59 @@
+package org.apache.s2graph.s2jobs.utils
+
+import io.thekraken.grok.api.Grok
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.Row
+
+import scala.collection.mutable
+
+object GrokHelper extends Logger {
+ private val grokPool:mutable.Map[String, Grok] = mutable.Map.empty
+
+ def getGrok(name:String, patternFiles:Seq[String], patterns:Map[String, String], compilePattern:String):Grok = {
+ if (grokPool.get(name).isEmpty) {
+ println(s"Grok '$name' initialized..")
+ val grok = new Grok()
+ patternFiles.foreach { patternFile =>
+ val filePath = SparkFiles.get(patternFile)
+ println(s"[Grok][$name] add pattern file : $patternFile ($filePath)")
+ grok.addPatternFromFile(filePath)
+ }
+ patterns.foreach { case (name, pattern) =>
+ println(s"[Grok][$name] add pattern : $name ($pattern)")
+ grok.addPattern(name, pattern)
+ }
+
+ grok.compile(compilePattern)
+ println(s"[Grok][$name] patterns: ${grok.getPatterns}")
+ grokPool.put(name, grok)
+ }
+
+ grokPool(name)
+ }
+
+ def grokMatch(text:String)(implicit grok:Grok):Option[Map[String, String]] = {
+ import scala.collection.JavaConverters._
+
+ val m = grok.`match`(text)
+ m.captures()
+ val rstMap = m.toMap.asScala.toMap
+ .filter(_._2 != null)
+ .map{ case (k, v) => k -> v.toString}
+ if (rstMap.isEmpty) None else Some(rstMap)
+ }
+
+ def grokMatchWithSchema(text:String)(implicit grok:Grok, keys:Array[String]):Option[Row] = {
+ import scala.collection.JavaConverters._
+
+ val m = grok.`match`(text)
+ m.captures()
+
+ val rstMap = m.toMap.asScala.toMap
+ if (rstMap.isEmpty) None
+ else {
+ val l = keys.map { key => rstMap.getOrElse(key, null)}
+ Some(Row.fromSeq(l))
+ }
+ }
+}