blob: 37485c88c6ec52c5d1374d8c82c7b9235dc48837 [file] [log] [blame]
package org.apache.s2graph.s2jobs.utils
import io.thekraken.grok.api.Grok
import org.apache.s2graph.s2jobs.Logger
import org.apache.spark.SparkFiles
import org.apache.spark.sql.Row
import scala.collection.mutable
object GrokHelper extends Logger {
private val grokPool:mutable.Map[String, Grok] = mutable.Map.empty
def getGrok(name:String, patternFiles:Seq[String], patterns:Map[String, String], compilePattern:String):Grok = {
if (grokPool.get(name).isEmpty) {
println(s"Grok '$name' initialized..")
val grok = new Grok()
patternFiles.foreach { patternFile =>
val filePath = SparkFiles.get(patternFile)
println(s"[Grok][$name] add pattern file : $patternFile ($filePath)")
grok.addPatternFromFile(filePath)
}
patterns.foreach { case (name, pattern) =>
println(s"[Grok][$name] add pattern : $name ($pattern)")
grok.addPattern(name, pattern)
}
grok.compile(compilePattern)
println(s"[Grok][$name] patterns: ${grok.getPatterns}")
grokPool.put(name, grok)
}
grokPool(name)
}
def grokMatch(text:String)(implicit grok:Grok):Option[Map[String, String]] = {
import scala.collection.JavaConverters._
val m = grok.`match`(text)
m.captures()
val rstMap = m.toMap.asScala.toMap
.filter(_._2 != null)
.map{ case (k, v) => k -> v.toString}
if (rstMap.isEmpty) None else Some(rstMap)
}
def grokMatchWithSchema(text:String)(implicit grok:Grok, keys:Array[String]):Option[Row] = {
import scala.collection.JavaConverters._
val m = grok.`match`(text)
m.captures()
val rstMap = m.toMap.asScala.toMap
if (rstMap.isEmpty) None
else {
val l = keys.map { key => rstMap.getOrElse(key, null)}
Some(Row.fromSeq(l))
}
}
}