blob: 1baa89dd4712e124792fbeef59928be9a348cb90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.Integrate
import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, Json}
class CrudTest extends IntegrateCommon {
import CrudHelper._
import TestUtil._
var tcString = ""
var bulkQueries = List.empty[(Long, String, String)]
var expected = Map.empty[String, String]
val curTime = System.currentTimeMillis
val t1 = curTime + 0
val t2 = curTime + 1
val t3 = curTime + 2
val t4 = curTime + 3
val t5 = curTime + 4
val tcRunner = new CrudTestRunner()
test("1: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
val tcNum = 1
tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
bulkQueries = List(
(t1, "insert", "{\"time\": 10}"),
(t2, "delete", ""),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("2: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
val tcNum = 2
tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
bulkQueries = List(
(t1, "insert", "{\"time\": 10}"),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("3: [t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test") {
val tcNum = 3
tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test "
bulkQueries = List(
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""),
(t1, "insert", "{\"time\": 10}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("4: [t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test") {
val tcNum = 4
tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test "
bulkQueries = List(
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t1, "insert", "{\"time\": 10}"),
(t2, "delete", ""))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("5: [t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test") {
val tcNum = 5
tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test"
bulkQueries = List(
(t2, "delete", ""),
(t1, "insert", "{\"time\": 10}"),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("6: [t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test") {
val tcNum = 6
tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test "
bulkQueries = List(
(t2, "delete", ""),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t1, "insert", "{\"time\": 10}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("7: [t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test ") {
val tcNum = 7
tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test "
bulkQueries = List(
(t1, "update", "{\"time\": 10}"),
(t2, "delete", ""),
(t3, "update", "{\"time\": 10, \"weight\": 20}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("8: [t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test ") {
val tcNum = 8
tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test "
bulkQueries = List(
(t1, "update", "{\"time\": 10}"),
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("9: [t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test") {
val tcNum = 9
tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test "
bulkQueries = List(
(t2, "delete", ""),
(t1, "update", "{\"time\": 10}"),
(t3, "update", "{\"time\": 10, \"weight\": 20}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("10: [t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test") {
val tcNum = 10
tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test"
bulkQueries = List(
(t2, "delete", ""),
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t1, "update", "{\"time\": 10}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("11: [t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test") {
val tcNum = 11
tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test "
bulkQueries = List(
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""),
(t1, "update", "{\"time\": 10}"))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("12: [t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test") {
val tcNum = 12
tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test "
bulkQueries = List(
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t1, "update", "{\"time\": 10}"),
(t2, "delete", ""))
expected = Map("time" -> "10", "weight" -> "20")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("13: [t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test ") {
val tcNum = 13
tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test "
bulkQueries = List(
(t5, "update", "{\"is_blocked\": true}"),
(t1, "insert", "{\"is_hidden\": false}"),
(t3, "insert", "{\"is_hidden\": false, \"weight\": 10}"),
(t2, "delete", ""),
(t4, "update", "{\"time\": 1, \"weight\": -10}"))
expected = Map("time" -> "1", "weight" -> "-10", "is_hidden" -> "false", "is_blocked" -> "true")
tcRunner.run(tcNum, tcString, bulkQueries, expected)
}
test("14 - test lock expire") {
for {
labelName <- List(testLabelName, testLabelName2)
} {
val id = 0
tcRunner.expireTC(labelName, id)
}
}
object CrudHelper {
class CrudTestRunner {
var seed = System.currentTimeMillis()
def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = {
for {
labelName <- List(testLabelName, testLabelName2)
i <- 0 until NumOfEachTest
} {
seed += 1
val srcId = seed.toString
val tgtId = srcId
val maxTs = opWithProps.map(t => t._1).max
/** insert edges */
println(s"---- TC${tcNum}_init ----")
val bulkEdges = (for ((ts, op, props) <- opWithProps) yield {
TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props)
})
println(s"${bulkEdges.mkString("\n")}")
TestUtil.insertEdgesSync(bulkEdges: _*)
for {
label <- Label.findByName(labelName)
direction <- List("out", "in")
cacheTTL <- List(-1L)
} {
val (serviceName, columnName, id, otherId) = direction match {
case "out" => (label.srcService.serviceName, label.srcColumn.columnName, srcId, tgtId)
case "in" => (label.tgtService.serviceName, label.tgtColumn.columnName, tgtId, srcId)
}
val qId = if (labelName == testLabelName) id else "\"" + id + "\""
val query = queryJson(serviceName, columnName, labelName, qId, direction, cacheTTL)
val jsResult = TestUtil.getEdgesSync(query)
val results = jsResult \ "results"
val deegrees = (jsResult \ "degrees").as[List[JsObject]]
val propsLs = (results \\ "props").seq
(deegrees.head \ LabelMeta.degree.name).as[Int] should be(1)
val from = (results \\ "from").seq.last.toString.replaceAll("\"", "")
val to = (results \\ "to").seq.last.toString.replaceAll("\"", "")
from should be(id.toString)
to should be(otherId.toString)
(results \\ "_timestamp").seq.last.as[Long] should be(maxTs)
for ((key, expectedVal) <- expected) {
propsLs.last.as[JsObject].keys.contains(key) should be(true)
(propsLs.last \ key).get.toString should be(expectedVal)
}
}
}
}
def expireTC(labelName: String, id: Int) = {
var i = 1
val label = Label.findByName(labelName).get
val serviceName = label.serviceName
val columnName = label.srcColumnName
val id = 0
while (i < 1000) {
val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString()))
val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id)
if (!rets.forall(_.isSuccess)) {
Thread.sleep(graph.LockExpireDuration + 100)
/** expect current request would be ignored */
val bulkEdges = Seq(TestUtil.toEdge(i-1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString()))
val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
if (rets.forall(_.isSuccess)) {
// check
val jsResult = TestUtil.getEdgesSync(queryJson)
(jsResult \\ "time").head.as[Int] should be(10)
logger.debug(jsResult)
i = 100000
}
}
i += 1
}
i = 1
while (i < 1000) {
val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString()))
val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id)
if (!rets.forall(_.isSuccess)) {
Thread.sleep(graph.LockExpireDuration + 100)
/** expect current request would be applied */
val bulkEdges = Seq(TestUtil.toEdge(i+1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString()))
val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
if (rets.forall(_.isSuccess)) {
// check
val jsResult = TestUtil.getEdgesSync(queryJson)
(jsResult \\ "time").head.as[Int] should be(20)
logger.debug(jsResult)
i = 100000
}
}
i += 1
}
}
def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse(
s""" { "srcVertices": [
{ "serviceName": "$serviceName",
"columnName": "$columnName",
"id": $id } ],
"steps": [ [ {
"label": "$labelName",
"direction": "$dir",
"offset": 0,
"limit": 10,
"cacheTTL": $cacheTTL }]]}""")
def querySnapshotEdgeJson(serviceName: String, columnName: String, labelName: String, id: Int) = Json.parse(
s""" { "srcVertices": [
{ "serviceName": "$serviceName",
"columnName": "$columnName",
"id": $id } ],
"steps": [ [ {
"label": "$labelName",
"_to": $id }]]}""")
}
}
}