blob: e4c3467076185ec0243bc422141a4a12970be1fc [file] [log] [blame]
package org.apache.s2graph.s2jobs.wal
import com.google.common.hash.Hashing
import org.apache.s2graph.core.JSONParser
import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import play.api.libs.json.{JsObject, Json}
import scala.util.Try
object WalLogAgg {
val outputColumns = Seq("from", "logs", "maxTs", "minTs")
def apply(walLog: WalLog): WalLogAgg = {
new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
}
def toFeatureHash(dim: String, value: String): Long = {
Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
}
def filter(walLogAgg: WalLogAgg, validFeatureHashKeys: Set[Long]) = {
val filtered = walLogAgg.logs.map { walLog =>
val fields = Json.parse(walLog.props).as[JsObject].fields.filter { case (dim, jsValue) =>
validFeatureHashKeys(toFeatureHash(dim, JSONParser.jsValueToString(jsValue)))
}
walLog.copy(props = Json.toJson(fields).as[JsObject].toString)
}
walLogAgg.copy(logs = filtered)
}
def merge(iter: Iterator[WalLogAgg],
param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
val heap = new BoundedPriorityQueue[WalLog](param.heapSize)
var minTs = Long.MaxValue
var maxTs = Long.MinValue
iter.foreach { walLogAgg =>
minTs = Math.min(walLogAgg.minTs, minTs)
maxTs = Math.max(walLogAgg.maxTs, maxTs)
walLogAgg.logs.foreach { walLog =>
heap += walLog
}
}
val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) else heap.toArray
WalLogAgg(topItems.head.from, topItems, maxTs, minTs)
}
}
case class DimVal(dim: String, value: String)
case class WalLogAgg(from: String,
logs: Seq[WalLog],
maxTs: Long,
minTs: Long)
case class WalLog(timestamp: Long,
operation: String,
elem: String,
from: String,
to: String,
service: String,
label: String,
props: String) {
val id = from
val columnName = label
val serviceName = to
lazy val propsKeyValues = Json.parse(props).as[JsObject].fields.map { case (key, jsValue) =>
key -> JSONParser.jsValueToString(jsValue)
}
}
object WalLog {
val orderByTsAsc = Ordering.by[WalLog, Long](walLog => walLog.timestamp)
val WalLogSchema = StructType(Seq(
StructField("timestamp", LongType, false),
StructField("operation", StringType, false),
StructField("elem", StringType, false),
StructField("from", StringType, false),
StructField("to", StringType, false),
StructField("service", StringType, true),
StructField("label", StringType, false),
StructField("props", StringType, false)
// StructField("direction", StringType, true)
))
def fromRow(row: Row): WalLog = {
val timestamp = row.getAs[Long]("timestamp")
val operation = Try(row.getAs[String]("operation")).toOption.getOrElse("insert")
val elem = Try(row.getAs[String]("elem")).toOption.getOrElse("edge")
val from = row.getAs[String]("from")
val to = row.getAs[String]("to")
val service = row.getAs[String]("service")
val label = row.getAs[String]("label")
val props = Try(row.getAs[String]("props")).toOption.getOrElse("{}")
WalLog(timestamp, operation, elem, from, to, service, label, props)
}
}