add wal package, WalLog class, UserDefinedAggregateFunction.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
index 58d3368..1c47f13 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
@@ -22,10 +22,19 @@
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object Schema {
- val BulkLoadSchema = StructType(Seq(
- StructField("timestamp", LongType, false),
- StructField("operation", StringType, false),
- StructField("elem", StringType, false),
+ /**
+ * root
+ * |-- timestamp: long (nullable = false)
+ * |-- operation: string (nullable = false)
+ * |-- elem: string (nullable = false)
+ */
+ val CommonFields = Seq(
+ StructField("timestamp", LongType, nullable = false),
+ StructField("operation", StringType, nullable = false),
+ StructField("elem", StringType, nullable = false)
+ )
+
+ val BulkLoadSchema = StructType(CommonFields ++ Seq(
StructField("from", StringType, false),
StructField("to", StringType, false),
StructField("label", StringType, false),
@@ -33,24 +42,63 @@
StructField("direction", StringType, true)
))
- val VertexSchema = StructType(Seq(
- StructField("timestamp", LongType, false),
- StructField("operation", StringType, false),
- StructField("elem", StringType, false),
+ /**
+ * root
+ * |-- timestamp: long (nullable = true)
+ * |-- operation: string (nullable = true)
+ * |-- elem: string (nullable = true)
+ * |-- id: string (nullable = true)
+ * |-- service: string (nullable = true)
+ * |-- column: string (nullable = true)
+ * |-- props: string (nullable = true)
+ */
+ val VertexSchema = StructType(CommonFields ++ Seq(
StructField("id", StringType, false),
StructField("service", StringType, false),
StructField("column", StringType, false),
StructField("props", StringType, false)
))
- val EdgeSchema = StructType(Seq(
- StructField("timestamp", LongType, false),
- StructField("operation", StringType, false),
- StructField("elem", StringType, false),
+
+ /**
+ * root
+ * |-- timestamp: long (nullable = true)
+ * |-- operation: string (nullable = true)
+ * |-- elem: string (nullable = true)
+ * |-- from: string (nullable = true)
+ * |-- to: string (nullable = true)
+ * |-- label: string (nullable = true)
+ * |-- props: string (nullable = true)
+ * |-- direction: string (nullable = true)
+ */
+ val EdgeSchema = StructType(CommonFields ++ Seq(
StructField("from", StringType, false),
StructField("to", StringType, false),
StructField("label", StringType, false),
StructField("props", StringType, false),
StructField("direction", StringType, true)
))
+
+ /**
+ * root
+ * |-- timestamp: long (nullable = false)
+ * |-- operation: string (nullable = false)
+ * |-- elem: string (nullable = false)
+ * |-- id: string (nullable = true)
+ * |-- service: string (nullable = true)
+ * |-- column: string (nullable = true)
+ * |-- from: string (nullable = true)
+ * |-- to: string (nullable = true)
+ * |-- label: string (nullable = true)
+ * |-- props: string (nullable = true)
+ */
+ val GraphElementSchema = StructType(CommonFields ++ Seq(
+ StructField("id", StringType, nullable = true),
+ StructField("service", StringType, nullable = true),
+ StructField("column", StringType, nullable = true),
+ StructField("from", StringType, nullable = true),
+ StructField("to", StringType, nullable = true),
+ StructField("label", StringType, nullable = true),
+ StructField("props", StringType, nullable = true)
+ ))
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index bfac62b..5bbf166 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -19,12 +19,13 @@
package org.apache.s2graph.s2jobs.task
-import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.{JSONParser, Management}
import org.apache.s2graph.s2jobs.Schema
import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter
import org.apache.spark.sql.{DataFrame, SparkSession}
+import play.api.libs.json.{JsObject, Json}
/**
@@ -103,8 +104,9 @@
case "edgeLog" =>
ss.read.format("com.databricks.spark.csv").option("delimiter", "\t")
.schema(BulkLoadSchema).load(paths: _*)
- case _ => ss.read.format(format).load(paths: _*)
+ case _ =>
val df = ss.read.format(format).load(paths: _*)
+
if (columnsOpt.isDefined) df.toDF(columnsOpt.get.split(",").map(_.trim): _*) else df
}
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
new file mode 100644
index 0000000..1b70a8a
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -0,0 +1,31 @@
+package org.apache.s2graph.s2jobs.wal
+
+import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
+
+case class WalLog(timestamp:Long,
+ operation:String,
+ elem:String,
+ from:String,
+ to:String,
+ service:String,
+ label:String,
+ props:String) {
+ val id = from
+ val columnName = label
+ val serviceName = to
+}
+
+object WalLog {
+ val WalLogSchema = StructType(Seq(
+ StructField("timestamp", LongType, false),
+ StructField("operation", StringType, false),
+ StructField("elem", StringType, false),
+ StructField("from", StringType, false),
+ StructField("to", StringType, false),
+ StructField("service", StringType, true),
+ StructField("label", StringType, false),
+ StructField("props", StringType, false)
+ // StructField("direction", StringType, true)
+ ))
+}
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
new file mode 100644
index 0000000..a1d17f1
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
@@ -0,0 +1,33 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.udafs._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+/**
+ * expect S2EdgeData dataframe as input.
+ * @param taskConf
+ */
+class S2EdgeDataAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+ override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+ val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
+ val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_))
+ val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("timestamp,to,label,props").split(",").map(col(_))
+ taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _))
+
+ val aggregator = new GroupByAgg(maxNumOfEdges)
+
+ val edges = inputMap(taskConf.inputs.head)
+
+ edges
+ .groupBy(groupByColumns: _*)
+ .agg(
+ aggregator(aggregateColumns: _*).as("edges"),
+ max(col("timestamp")).as("max_ts"),
+ min(col("timestamp")).as("min_ts")
+ )
+ }
+
+ override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
new file mode 100644
index 0000000..08df269
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
@@ -0,0 +1,29 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.udafs.GroupByArrayAgg
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
+class S2EdgeDataArrayAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+ override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+ import ss.sqlContext.implicits._
+ val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
+ val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_))
+ val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("edges").split(",").map(col(_))
+ taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _))
+ val aggregator = new GroupByArrayAgg(maxNumOfEdges)
+
+ val edges = inputMap(taskConf.inputs.head)
+
+ edges
+ .groupBy(groupByColumns: _*)
+ .agg(
+ aggregator(aggregateColumns: _*).as("edges"),
+ max(col("max_ts")).as("max_ts"),
+ min(col("min_ts")).as("min_ts")
+ )
+ }
+
+ override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
new file mode 100644
index 0000000..1b12235
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
@@ -0,0 +1,166 @@
+package org.apache.s2graph.s2jobs.wal.udafs
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+object S2EdgeDataAggregate {
+ type Element = (Long, String, String, String)
+
+ val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty"))
+
+ val elementOrd = Ordering.by[Element, Long](_._1)
+
+ val rowOrdering = new Ordering[Row] {
+ override def compare(x: Row, y: Row): Int = {
+ x.getAs[Long](0).compareTo(y.getAs[Long](0))
+ }
+ }
+
+ val rowOrderingDesc = new Ordering[Row] {
+ override def compare(x: Row, y: Row): Int = {
+ -x.getAs[Long](0).compareTo(y.getAs[Long](0))
+ }
+ }
+
+ val fields = Seq(
+ StructField(name = "timestamp", LongType),
+ StructField(name = "to", StringType),
+ StructField(name = "label", StringType),
+ StructField(name = "props", StringType)
+ )
+
+ val arrayType = ArrayType(elementType = StructType(fields))
+
+ def apply(maxNumOfEdges: Int = 1000): GroupByAgg = {
+ new GroupByAgg(maxNumOfEdges)
+ }
+
+ def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: Ordering[T]): Seq[T] = {
+ import scala.collection.mutable
+ val (n, m) = (cur.size, prev.size)
+
+ var (i, j) = (0, 0)
+ var idx = 0
+ val arr = new mutable.ArrayBuffer[T](size)
+
+ while (idx < size && i < n && j < m) {
+ if (ordering.compare(cur(i), prev(j)) > 0) {
+ arr += cur(i)
+ i += 1
+ } else {
+ arr += prev(j)
+ j += 1
+ }
+ idx += 1
+ }
+ while (idx < size && i < n) {
+ arr += cur(i)
+ i += 1
+ }
+ while (idx < size && j < m) {
+ arr += prev(j)
+ j += 1
+ }
+
+ arr
+ }
+}
+
+class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
+
+ import S2EdgeDataAggregate._
+
+ implicit val ord = rowOrderingDesc
+
+ val arrayType = ArrayType(elementType = StructType(fields))
+
+ override def inputSchema: StructType = StructType(fields)
+
+ override def bufferSchema: StructType = StructType(Seq(
+ StructField(name = "edges", dataType = arrayType),
+ StructField(name = "buffered", dataType = BooleanType)
+ ))
+
+ override def dataType: DataType = arrayType
+
+ override def deterministic: Boolean = true
+
+ override def initialize(buffer: MutableAggregationBuffer): Unit = {
+ buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element])
+ }
+
+ /* not optimized */
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ val element = input
+
+ val prev = buffer.getAs[Seq[Row]](0)
+ val appended = prev :+ element
+
+ buffer.update(0, appended)
+ buffer.update(1, false)
+ }
+
+ private def takeTopK(ls: Seq[Row], k: Int) = {
+ val sorted = ls.sorted
+ if (sorted.size <= k) sorted else sorted.take(k)
+ }
+
+ /* not optimized */
+ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+ val cur = buffer2.getAs[Seq[Row]](0)
+ val prev = buffer1.getAs[Seq[Row]](0)
+
+ buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges))
+ buffer1.update(1, true)
+ }
+
+ override def evaluate(buffer: Row): Any = {
+ val ls = buffer.getAs[Seq[Row]](0)
+ val buffered = buffer.getAs[Boolean](1)
+ if (buffered) ls
+ else takeTopK(ls, maxNumOfEdges)
+ }
+}
+
+class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
+
+ import S2EdgeDataAggregate._
+
+ implicit val ord = rowOrdering
+
+ import scala.collection.mutable
+
+ override def inputSchema: StructType = StructType(Seq(
+ StructField(name = "edges", dataType = arrayType)
+ ))
+
+ override def bufferSchema: StructType = StructType(Seq(
+ StructField(name = "edges", dataType = arrayType)
+ ))
+
+ override def dataType: DataType = arrayType
+
+ override def deterministic: Boolean = true
+
+ override def initialize(buffer: MutableAggregationBuffer): Unit =
+ buffer.update(0, mutable.ListBuffer.empty[Row])
+
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ val cur = input.getAs[Seq[Row]](0)
+ val prev = buffer.getAs[Seq[Row]](0)
+ val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
+
+ buffer.update(0, merged)
+ }
+
+ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+ val cur = buffer2.getAs[Seq[Row]](0)
+ val prev = buffer1.getAs[Seq[Row]](0)
+
+ buffer1.update(0, mergeTwoSeq(cur, prev, maxNumOfEdges))
+ }
+
+ override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0)
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
new file mode 100644
index 0000000..72c8cf5
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
@@ -0,0 +1,87 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLog
+import org.apache.s2graph.s2jobs.wal.udafs._
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
+ val walLogsLs = Seq(
+ WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
+ WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
+ WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
+ WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
+ )
+ val walLogsLs2 = Seq(
+ WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
+ WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
+ WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
+ WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
+ )
+
+
+ test("test S2EdgeDataAggregateProcess") {
+ import spark.sqlContext.implicits._
+
+ val edges = spark.createDataset((0 until 10).flatMap(ith => walLogsLs)).toDF()
+ val inputMap = Map("edges" -> edges)
+ val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
+ options = Map("maxNumOfEdges" -> "10",
+ "groupByAggClassName" -> "GroupByAgg"))
+
+ val job = new S2EdgeDataAggregateProcess(taskConf = taskConf)
+ val processed = job.execute(spark, inputMap)
+
+ processed.printSchema()
+ processed.collect().foreach { row =>
+ println(row)
+ }
+ }
+
+ test("test S2EdgeDataArrayAggregateProcess") {
+ import spark.sqlContext.implicits._
+
+ val edges = spark.createDataset(walLogsLs).toDF()
+ val edges2 = spark.createDataset(walLogsLs2).toDF()
+
+ val firstConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
+ options = Map("maxNumOfEdges" -> "10"))
+
+ val firstJob = new S2EdgeDataAggregateProcess(firstConf)
+ val firstJob2 = new S2EdgeDataAggregateProcess(firstConf)
+
+ val first = firstJob.execute(spark, Map("edges" -> edges))
+ val first2 = firstJob2.execute(spark, Map("edges" -> edges2))
+
+ val secondInputMap = Map(
+ "aggregated" -> first.union(first2)
+ )
+
+ val secondConf = new TaskConf(name = "testarray", `type` = "agg",
+ inputs = Seq("aggregated"),
+ options = Map("maxNumOfEdges" -> "10"))
+
+ val secondJob = new S2EdgeDataArrayAggregateProcess(secondConf)
+
+
+ val processed = secondJob.execute(spark, secondInputMap)
+
+ processed.printSchema()
+ processed.collect().foreach { row =>
+ println(row)
+ }
+ }
+
+ test("mergeTwoSeq") {
+ val prev: Array[Int] = Array(3, 2, 1)
+ val cur: Array[Int] = Array(4, 2, 2)
+
+ val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
+ println(ls.size)
+
+ ls.foreach { x =>
+ println(x)
+ }
+ }
+}
\ No newline at end of file