blob: f4a670bdd51c22a84091e2952442ac82ee833084 [file] [log] [blame]
package org.apache.s2graph.s2jobs.wal
import org.apache.s2graph.core.JSONParser
import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import play.api.libs.json.{JsObject, Json}
import scala.util.Try
object WalLogAgg {
val outputColumns = Seq("from", "logs", "maxTs", "minTs")
def apply(walLog: WalLog): WalLogAgg = {
new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
}
def merge(iter: Iterator[WalLogAgg],
param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
val heap = new BoundedPriorityQueue[WalLog](param.heapSize)
var minTs = Long.MaxValue
var maxTs = Long.MinValue
iter.foreach { walLogAgg =>
minTs = Math.min(walLogAgg.minTs, minTs)
maxTs = Math.max(walLogAgg.maxTs, maxTs)
walLogAgg.logs.foreach { walLog =>
heap += walLog
}
}
val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) else heap.toArray
WalLogAgg(topItems.head.from, topItems, maxTs, minTs)
}
}
case class DimVal(dim: String, value: String)
case class WalLogAgg(from: String,
logs: Seq[WalLog],
maxTs: Long,
minTs: Long)
case class WalLog(timestamp: Long,
operation: String,
elem: String,
from: String,
to: String,
service: String,
label: String,
props: String) {
val id = from
val columnName = label
val serviceName = to
lazy val propsKeyValues = Json.parse(props).as[JsObject].fields.map { case (key, jsValue) =>
key -> JSONParser.jsValueToString(jsValue)
}
}
object WalLog {
val orderByTsAsc = Ordering.by[WalLog, Long](walLog => walLog.timestamp)
val WalLogSchema = StructType(Seq(
StructField("timestamp", LongType, false),
StructField("operation", StringType, false),
StructField("elem", StringType, false),
StructField("from", StringType, false),
StructField("to", StringType, false),
StructField("service", StringType, true),
StructField("label", StringType, false),
StructField("props", StringType, false)
// StructField("direction", StringType, true)
))
def fromRow(row: Row): WalLog = {
val timestamp = row.getAs[Long]("timestamp")
val operation = Try(row.getAs[String]("operation")).toOption.getOrElse("insert")
val elem = Try(row.getAs[String]("elem")).toOption.getOrElse("edge")
val from = row.getAs[String]("from")
val to = row.getAs[String]("to")
val service = row.getAs[String]("service")
val label = row.getAs[String]("label")
val props = Try(row.getAs[String]("props")).toOption.getOrElse("{}")
WalLog(timestamp, operation, elem, from, to, service, label, props)
}
}