blob: 72c8cf55b7fbf1e7474c803a9b9eddfbb63662ed [file] [log] [blame]
package org.apache.s2graph.s2jobs.wal.process
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.s2graph.s2jobs.task.TaskConf
import org.apache.s2graph.s2jobs.wal.WalLog
import org.apache.s2graph.s2jobs.wal.udafs._
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
val walLogsLs = Seq(
WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
val walLogsLs2 = Seq(
WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
test("test S2EdgeDataAggregateProcess") {
import spark.sqlContext.implicits._
val edges = spark.createDataset((0 until 10).flatMap(ith => walLogsLs)).toDF()
val inputMap = Map("edges" -> edges)
val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
options = Map("maxNumOfEdges" -> "10",
"groupByAggClassName" -> "GroupByAgg"))
val job = new S2EdgeDataAggregateProcess(taskConf = taskConf)
val processed = job.execute(spark, inputMap)
processed.collect().foreach { row =>
test("test S2EdgeDataArrayAggregateProcess") {
import spark.sqlContext.implicits._
val edges = spark.createDataset(walLogsLs).toDF()
val edges2 = spark.createDataset(walLogsLs2).toDF()
val firstConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
options = Map("maxNumOfEdges" -> "10"))
val firstJob = new S2EdgeDataAggregateProcess(firstConf)
val firstJob2 = new S2EdgeDataAggregateProcess(firstConf)
val first = firstJob.execute(spark, Map("edges" -> edges))
val first2 = firstJob2.execute(spark, Map("edges" -> edges2))
val secondInputMap = Map(
"aggregated" -> first.union(first2)
val secondConf = new TaskConf(name = "testarray", `type` = "agg",
inputs = Seq("aggregated"),
options = Map("maxNumOfEdges" -> "10"))
val secondJob = new S2EdgeDataArrayAggregateProcess(secondConf)
val processed = secondJob.execute(spark, secondInputMap)
processed.collect().foreach { row =>
test("mergeTwoSeq") {
val prev: Array[Int] = Array(3, 2, 1)
val cur: Array[Int] = Array(4, 2, 2)
val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
ls.foreach { x =>