refactor WalLogAgg class.
- separate edges and vertices.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 8f21bc2..6d9f509 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -63,8 +63,7 @@
val dfKeys = dfMap.keySet
processes.filter{ p =>
- var existAllInput = true
- p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+ val existAllInput = p.conf.inputs.forall { input => dfKeys(input) }
!dfKeys(p.conf.name) && existAllInput
}
.map { p =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index 9617ca1..4080045 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,7 +1,7 @@
package org.apache.s2graph.s2jobs.wal
import com.google.common.hash.Hashing
-import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.{GraphUtil, JSONParser}
import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
import org.apache.s2graph.s2jobs.wal.transformer.Transformer
import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
@@ -12,10 +12,18 @@
import scala.util.Try
object WalLogAgg {
- val outputColumns = Seq("from", "logs", "maxTs", "minTs")
+ val outputColumns = Seq("from", "vertices", "edges")
+
+ def isEdge(walLog: WalLog): Boolean = {
+ walLog.elem == "edge" || walLog.elem == "e"
+ }
def apply(walLog: WalLog): WalLogAgg = {
- new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
+ val (vertices, edges) =
+ if (isEdge(walLog)) (Nil, Seq(walLog))
+ else (Seq(walLog), Nil)
+
+ new WalLogAgg(walLog.from, vertices, edges)
}
def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, dimVal.value)
@@ -24,29 +32,43 @@
Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
}
- def merge(iter: Iterator[WalLogAgg],
- param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
- val heap = new BoundedPriorityQueue[WalLog](param.heapSize)
- var minTs = Long.MaxValue
- var maxTs = Long.MinValue
+ private def addToHeap(iter: Seq[WalLog],
+ heap: BoundedPriorityQueue[WalLog],
+ validTimestampDuration: Option[Long]): Unit = {
+ val now = System.currentTimeMillis()
- iter.foreach { walLogAgg =>
- minTs = Math.min(walLogAgg.minTs, minTs)
- maxTs = Math.max(walLogAgg.maxTs, maxTs)
+ iter.foreach { walLog =>
+ val ts = walLog.timestamp
+ val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true)
- walLogAgg.logs.foreach { walLog =>
+ if (isValid) {
heap += walLog
}
}
- val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) else heap.toArray
-
- WalLogAgg(topItems.head.from, topItems, maxTs, minTs)
}
- def filterProps(walLogAgg: WalLogAgg,
- transformers: Seq[Transformer],
- validFeatureHashKeys: Set[Long]) = {
- val filtered = walLogAgg.logs.map { walLog =>
+ def merge(iter: Iterator[WalLogAgg],
+ param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = {
+ val edgeHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
+ val vertexHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
+
+ val validTimestampDuration = param.validTimestampDuration
+
+ iter.foreach { walLogAgg =>
+ addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration)
+ addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration)
+ }
+
+ val topVertices = if (param.sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
+ val topEdges = if (param.sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray
+
+ topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges))
+ }
+
+ private def filterPropsInner(walLogs: Seq[WalLog],
+ transformers: Seq[Transformer],
+ validFeatureHashKeys: Set[Long]): Seq[WalLog] = {
+ walLogs.map { walLog =>
val fields = walLog.propsJson.fields.filter { case (propKey, propValue) =>
val filtered = transformers.flatMap { transformer =>
transformer.toDimValLs(walLog, propKey, JSONParser.jsValueToString(propValue)).filter(dimVal => validFeatureHashKeys(toFeatureHash(dimVal)))
@@ -56,8 +78,15 @@
walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString)
}
+ }
- walLogAgg.copy(logs = filtered)
+ def filterProps(walLogAgg: WalLogAgg,
+ transformers: Seq[Transformer],
+ validFeatureHashKeys: Set[Long]) = {
+ val filteredVertices = filterPropsInner(walLogAgg.vertices, transformers, validFeatureHashKeys)
+ val filteredEdges = filterPropsInner(walLogAgg.edges, transformers, validFeatureHashKeys)
+
+ walLogAgg.copy(vertices = filteredVertices, edges = filteredEdges)
}
}
@@ -88,9 +117,8 @@
case class DimVal(dim: String, value: String)
case class WalLogAgg(from: String,
- logs: Seq[WalLog],
- maxTs: Long,
- minTs: Long)
+ vertices: Seq[WalLog],
+ edges: Seq[WalLog])
case class WalLog(timestamp: Long,
operation: String,
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index ee4debd..aebb1cc 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -10,7 +10,7 @@
dataset: Dataset[WalLogAgg],
aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]) = {
import ss.implicits._
- dataset.groupByKey(_.from).mapGroups { case (_, iter) =>
+ dataset.groupByKey(_.from).flatMapGroups { case (_, iter) =>
WalLogAgg.merge(iter, aggregateParam)
}.toDF(WalLogAgg.outputColumns: _*)
}
@@ -20,7 +20,7 @@
aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]): DataFrame = {
import ss.implicits._
- dataset.groupByKey(walLog => walLog.from).mapGroups { case (key, iter) =>
+ dataset.groupByKey(walLog => walLog.from).flatMapGroups { case (key, iter) =>
WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam)
}.toDF(WalLogAgg.outputColumns: _*)
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
index 4cb1377..ecc12e3 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -14,12 +14,14 @@
val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(defaultIsArrayType)
val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems)
val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+ val validTimestampDuration = taskConf.options.get("validTimestampDuration").map(_.toLong).getOrElse(Long.MaxValue)
new AggregateParam(groupByKeys = groupByKeys,
topK = Option(maxNumOfEdges),
isArrayType = Option(arrayType),
shouldSortTopItems = Option(sortTopItems),
- numOfPartitions = numOfPartitions
+ numOfPartitions = numOfPartitions,
+ validTimestampDuration = Option(validTimestampDuration)
)
}
}
@@ -28,7 +30,8 @@
topK: Option[Int],
isArrayType: Option[Boolean],
shouldSortTopItems: Option[Boolean],
- numOfPartitions: Option[Int]) {
+ numOfPartitions: Option[Int],
+ validTimestampDuration: Option[Long]) {
import AggregateParam._