blob: 408004544780cf879f1d1c077d1a8f907d4361fe [file] [log] [blame]
package org.apache.s2graph.s2jobs.wal
import com.google.common.hash.Hashing
import org.apache.s2graph.core.{GraphUtil, JSONParser}
import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
import org.apache.s2graph.s2jobs.wal.transformer.Transformer
import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import play.api.libs.json.{JsObject, Json}
import scala.util.Try
object WalLogAgg {
val outputColumns = Seq("from", "vertices", "edges")
def isEdge(walLog: WalLog): Boolean = {
walLog.elem == "edge" || walLog.elem == "e"
}
def apply(walLog: WalLog): WalLogAgg = {
val (vertices, edges) =
if (isEdge(walLog)) (Nil, Seq(walLog))
else (Seq(walLog), Nil)
new WalLogAgg(walLog.from, vertices, edges)
}
def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, dimVal.value)
def toFeatureHash(dim: String, value: String): Long = {
Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
}
private def addToHeap(iter: Seq[WalLog],
heap: BoundedPriorityQueue[WalLog],
validTimestampDuration: Option[Long]): Unit = {
val now = System.currentTimeMillis()
iter.foreach { walLog =>
val ts = walLog.timestamp
val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true)
if (isValid) {
heap += walLog
}
}
}
def merge(iter: Iterator[WalLogAgg],
param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = {
val edgeHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
val vertexHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
val validTimestampDuration = param.validTimestampDuration
iter.foreach { walLogAgg =>
addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration)
addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration)
}
val topVertices = if (param.sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
val topEdges = if (param.sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray
topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges))
}
private def filterPropsInner(walLogs: Seq[WalLog],
transformers: Seq[Transformer],
validFeatureHashKeys: Set[Long]): Seq[WalLog] = {
walLogs.map { walLog =>
val fields = walLog.propsJson.fields.filter { case (propKey, propValue) =>
val filtered = transformers.flatMap { transformer =>
transformer.toDimValLs(walLog, propKey, JSONParser.jsValueToString(propValue)).filter(dimVal => validFeatureHashKeys(toFeatureHash(dimVal)))
}
filtered.nonEmpty
}
walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString)
}
}
def filterProps(walLogAgg: WalLogAgg,
transformers: Seq[Transformer],
validFeatureHashKeys: Set[Long]) = {
val filteredVertices = filterPropsInner(walLogAgg.vertices, transformers, validFeatureHashKeys)
val filteredEdges = filterPropsInner(walLogAgg.edges, transformers, validFeatureHashKeys)
walLogAgg.copy(vertices = filteredVertices, edges = filteredEdges)
}
}
object DimValCountRank {
def fromRow(row: Row): DimValCountRank = {
val dim = row.getAs[String]("dim")
val value = row.getAs[String]("value")
val count = row.getAs[Long]("count")
val rank = row.getAs[Long]("rank")
new DimValCountRank(DimVal(dim, value), count, rank)
}
}
case class DimValCountRank(dimVal: DimVal, count: Long, rank: Long)
case class DimValCount(dimVal: DimVal, count: Long)
object DimVal {
def fromRow(row: Row): DimVal = {
val dim = row.getAs[String]("dim")
val value = row.getAs[String]("value")
new DimVal(dim, value)
}
}
case class DimVal(dim: String, value: String)
case class WalLogAgg(from: String,
vertices: Seq[WalLog],
edges: Seq[WalLog])
case class WalLog(timestamp: Long,
operation: String,
elem: String,
from: String,
to: String,
service: String,
label: String,
props: String) {
val id = from
val columnName = label
val serviceName = to
lazy val propsJson = Json.parse(props).as[JsObject]
lazy val propsKeyValues = propsJson.fields.map { case (key, jsValue) =>
key -> JSONParser.jsValueToString(jsValue)
}
}
object WalLog {
val orderByTsAsc = Ordering.by[WalLog, Long](walLog => walLog.timestamp)
val WalLogSchema = StructType(Seq(
StructField("timestamp", LongType, false),
StructField("operation", StringType, false),
StructField("elem", StringType, false),
StructField("from", StringType, false),
StructField("to", StringType, false),
StructField("service", StringType, true),
StructField("label", StringType, false),
StructField("props", StringType, false)
// StructField("direction", StringType, true)
))
def fromRow(row: Row): WalLog = {
val timestamp = row.getAs[Long]("timestamp")
val operation = Try(row.getAs[String]("operation")).toOption.getOrElse("insert")
val elem = Try(row.getAs[String]("elem")).toOption.getOrElse("edge")
val from = row.getAs[String]("from")
val to = row.getAs[String]("to")
val service = row.getAs[String]("service")
val label = row.getAs[String]("label")
val props = Try(row.getAs[String]("props")).toOption.getOrElse("{}")
WalLog(timestamp, operation, elem, from, to, service, label, props)
}
}