blob: 04ccc79eccdb6957e56c7639867652f511c21754 [file] [log] [blame]
package controllers
import java.util.concurrent.TimeUnit
import com.kakao.s2graph.core.utils.logger
import play.api.libs.json._
import play.api.test.Helpers._
import play.api.test.{FakeApplication, FakeRequest}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.Random
class StrongLabelDeleteSpec extends SpecCommon {
init()
// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES))
def bulkEdges(startTs: Int = 0) = Seq(
Seq(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}""").mkString("\t"),
Seq(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}""").mkString("\t"),
Seq(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}""").mkString("\t"),
Seq(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}""").mkString("\t"),
Seq(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}""").mkString("\t"),
Seq(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}""").mkString("\t"),
Seq(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}""").mkString("\t"),
Seq(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""").mkString("\t")
).mkString("\n")
def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName,
labelName: String = testLabelName2, direction: String = "out") = Json.parse(
s"""
{ "srcVertices": [
{ "serviceName": "$serviceName",
"columnName": "$columnName",
"id": $id
}],
"steps": [
[ {
"label": "$labelName",
"direction": "${direction}",
"offset": 0,
"limit": -1,
"duplicate": "raw"
}
]]
}""")
def getEdges(queryJson: JsValue): JsValue = {
// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES))
val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get
contentAsJson(ret)
}
def getDegree(jsValue: JsValue): Long = {
((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L)
}
"strong label delete test" should {
running(FakeApplication()) {
// insert bulk and wait ..
val edges = bulkEdges()
println(edges)
val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true))
}
"test strong consistency select" in {
running(FakeApplication()) {
var result = getEdges(query(0))
println(result)
(result \ "results").as[List[JsValue]].size must equalTo(2)
result = getEdges(query(10))
println(result)
(result \ "results").as[List[JsValue]].size must equalTo(2)
true
}
}
"test strong consistency duration. insert -> delete -> insert" in {
running(FakeApplication()) {
val ts0 = 1
val ts1 = 2
val ts2 = 3
val edges = Seq(
Seq(5, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t"),
Seq(10, "delete", "edge", "-10", "-20", testLabelName2).mkString("\t"),
Seq(20, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t")
).mkString("\n")
val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true))
val result = getEdges(query(-10))
println(result)
true
}
}
"test strong consistency deleteAll" in {
running(FakeApplication()) {
val deletedAt = 100
var result = getEdges(query(20, direction = "in", columnName = testTgtColumnName))
println(result)
(result \ "results").as[List[JsValue]].size must equalTo(3)
val json = Json.arr(Json.obj("label" -> testLabelName2,
"direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt))
println(json)
contentAsString(EdgeController.deleteAllInner(json, withWait = true))
result = getEdges(query(11, direction = "out"))
println(result)
(result \ "results").as[List[JsValue]].size must equalTo(0)
result = getEdges(query(12, direction = "out"))
println(result)
(result \ "results").as[List[JsValue]].size must equalTo(0)
result = getEdges(query(10, direction = "out"))
println(result)
// 10 -> out -> 20 should not be in result.
(result \ "results").as[List[JsValue]].size must equalTo(1)
(result \\ "to").size must equalTo(1)
(result \\ "to").head.as[String] must equalTo("21")
result = getEdges(query(20, direction = "in", columnName = testTgtColumnName))
println(result)
(result \ "results").as[List[JsValue]].size must equalTo(0)
val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(startTs = deletedAt + 1), withWait = true))
result = getEdges(query(20, direction = "in", columnName = testTgtColumnName))
println(result)
(result \ "results").as[List[JsValue]].size must equalTo(3)
true
}
}
}
"labelargeSet of contention" should {
val labelName = testLabelName2
val maxTgtId = 10
val batchSize = 10
val testNum = 3
val numOfBatch = 10
def testInner(startTs: Long, src: Long) = {
val labelName = testLabelName2
val lastOps = Array.fill(maxTgtId)("none")
var currentTs = startTs
val allRequests = for {
ith <- (0 until numOfBatch)
jth <- (0 until batchSize)
} yield {
currentTs += 1
val tgt = Random.nextInt(maxTgtId)
val op = if (Random.nextDouble() < 0.5) "delete" else "update"
lastOps(tgt) = op
Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t")
}
allRequests.foreach(println(_))
// println(lastOps.count(op => op != "delete" && op != "none"))
// println(lastOps)
//
// Thread.sleep(1000)
val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequest =>
val bulkEdge = bulkRequest.mkString("\n")
EdgeController.mutateAndPublish(bulkEdge, withWait = true)
}
Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
val expectedDegree = lastOps.count(op => op != "delete" && op != "none")
val queryJson = query(id = src)
val result = getEdges(queryJson)
val resultSize = (result \ "size").as[Long]
val resultDegree = getDegree(result)
println(lastOps.toList)
println(result)
val ret = resultDegree == expectedDegree && resultSize == resultDegree
if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree")
(ret, currentTs)
}
"update delete" in {
running(FakeApplication()) {
val ret = for {
i <- (0 until testNum)
} yield {
val src = System.currentTimeMillis()
val (ret, last) = testInner(i, src)
ret must beEqualTo(true)
ret
}
ret.forall(identity)
}
}
"update delete 2" in {
running(FakeApplication()) {
val src = System.currentTimeMillis()
var ts = 0L
val ret = for {
i <- (0 until testNum)
} yield {
val (ret, lastTs) = testInner(ts, src)
val deletedAt = lastTs + 1
val deletedAt2 = lastTs + 2
ts = deletedAt2 + 1 // nex start ts
ret must beEqualTo(true)
val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt))
val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2))
val deleteRet = EdgeController.deleteAllInner(deleteAllRequest, withWait = true)
val deleteRet2 = EdgeController.deleteAllInner(deleteAllRequest2, withWait = true)
Await.result(deleteRet, Duration(20, TimeUnit.MINUTES))
Await.result(deleteRet2, Duration(20, TimeUnit.MINUTES))
val result = getEdges(query(id = src))
println(result)
val resultEdges = (result \ "results").as[Seq[JsValue]]
resultEdges.isEmpty must beEqualTo(true)
val degreeAfterDeleteAll = getDegree(result)
degreeAfterDeleteAll must beEqualTo(0)
true
}
ret.forall(identity)
}
}
/** this test stress out test on degree
* when contention is low but number of adjacent edges are large */
"large degrees" in {
running(FakeApplication()) {
val labelName = testLabelName2
val dir = "out"
val maxSize = 100
val deleteSize = 10
val numOfConcurrentBatch = 100
val src = System.currentTimeMillis()
val tgts = (0 until maxSize).map { ith => src + ith }
val deleteTgts = Random.shuffle(tgts).take(deleteSize)
val insertRequests = tgts.map { tgt =>
Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
}
val deleteRequests = deleteTgts.take(deleteSize).map { tgt =>
Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t")
}
val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
// val allRequests = insertRequests ++ deleteRequests
val futures = allRequests.grouped(numOfConcurrentBatch).map { requests =>
EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true)
}
Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
val expectedDegree = insertRequests.size - deleteRequests.size
val queryJson = query(id = src)
val result = getEdges(queryJson)
val resultSize = (result \ "size").as[Long]
val resultDegree = getDegree(result)
// println(result)
val ret = resultSize == expectedDegree && resultDegree == resultSize
println(s"[MaxSize]: $maxSize")
println(s"[DeleteSize]: $deleteSize")
println(s"[ResultDegree]: $resultDegree")
println(s"[ExpectedDegree]: $expectedDegree")
println(s"[ResultSize]: $resultSize")
ret must beEqualTo(true)
}
}
"deleteAll" in {
running(FakeApplication()) {
val labelName = testLabelName2
val dir = "out"
val maxSize = 100
val deleteSize = 10
val numOfConcurrentBatch = 100
val src = System.currentTimeMillis()
val tgts = (0 until maxSize).map { ith => src + ith }
val deleteTgts = Random.shuffle(tgts).take(deleteSize)
val insertRequests = tgts.map { tgt =>
Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
}
val deleteRequests = deleteTgts.take(deleteSize).map { tgt =>
Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t")
}
val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
// val allRequests = insertRequests ++ deleteRequests
val futures = allRequests.grouped(numOfConcurrentBatch).map { requests =>
EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true)
}
Await.result(Future.sequence(futures), Duration(10, TimeUnit.MINUTES))
val deletedAt = System.currentTimeMillis()
val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt))
Await.result(EdgeController.deleteAllInner(deleteAllRequest, withWait = true), Duration(10, TimeUnit.MINUTES))
val result = getEdges(query(id = src))
println(result)
val resultEdges = (result \ "results").as[Seq[JsValue]]
resultEdges.isEmpty must beEqualTo(true)
val degreeAfterDeleteAll = getDegree(result)
degreeAfterDeleteAll must beEqualTo(0)
}
}
}
}