blob: 575341177f577676166b0a3f1ec84bae51241c3f [file] [log] [blame]
package org.apache.s2graph.s2jobs.wal.process
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.s2graph.s2jobs.task.TaskConf
import org.apache.s2graph.s2jobs.wal.WalLog
import org.apache.s2graph.s2jobs.wal.udafs._
import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import scala.collection.mutable
import scala.util.Random
class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
val walLogsLs = Seq(
WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
)
val walLogsLs2 = Seq(
WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""),
WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""),
WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""),
WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""")
)
test("test S2EdgeDataAggregateProcess") {
import spark.sqlContext.implicits._
val edges = spark.createDataset(walLogsLs).toDF()
val inputMap = Map("edges" -> edges)
val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
options = Map("maxNumOfEdges" -> "10")
)
val job = new WalLogAggregateProcess(taskConf = taskConf)
val processed = job.execute(spark, inputMap)
processed.printSchema()
processed.collect().foreach { row =>
println(row)
}
}
test("mergeTwoSeq") {
val prev: Array[Int] = Array(3, 2, 1)
val cur: Array[Int] = Array(4, 2, 2)
val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10)
println(ls.size)
ls.foreach { x =>
println(x)
}
}
test("addToTopK test.") {
import WalLogUDAF._
val numOfTest = 100
val numOfNums = 100
val maxNum = 10
(0 until numOfTest).foreach { testNum =>
val maxSize = 1 + Random.nextInt(numOfNums)
val pq = new BoundedPriorityQueue[Int](maxSize)
val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int]
arr.foreach { i =>
pq += i
result = addToTopK(result, maxSize, i)
}
result.sorted shouldBe pq.toSeq.sorted
}
}
}