blob: f4f916ba8941b636d535e25b86f88248949441f2 [file] [log] [blame]
package controllers
import com.daumkakao.s2graph.core._
import com.daumkakao.s2graph.rest.config.Config
import com.daumkakao.s2graph.rest.actors._
import com.twitter.io.exp.VarSource.Ok
import org.omg.CosNaming.NamingContextPackage.NotFound
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import org.specs2.execute.{ AsResult, Result }
import org.specs2.matcher.Matchers
import org.specs2.mutable.Specification
import play.api.libs.json.{ JsObject, JsValue, Json }
import play.api.test.Helpers._
import play.api.test.{ FakeApplication, FakeRequest, Helpers, WithApplication }
import play.api.libs.json.JsArray
import scala.concurrent.ExecutionContext
/**
* Data Integrity Test specification
* @author June.k( Junki Kim, june.k@kakao.com )
* @since 15. 1. 5.
*/
class IntegritySpec extends IntegritySpecificationBase with Matchers {
def spec =
new Specification {
sequential
/**
* Each TC init/logic/after step waiting time to complete
*/
// lazy val TC_WAITING_TIME = if (Config.CLIENT_AGGREGATE_BUFFER_FLUSH_TIME >= 500) 1500 else Config.CLIENT_AGGREGATE_BUFFER_FLUSH_TIME * 3 // in milliseconds
lazy val TC_WAITING_TIME = 1200
/**
* Each http request waiting time to complete
*/
lazy val HTTP_REQ_WAITING_TIME = Duration(1000, MILLISECONDS)
val asyncFlushInterval = 1500 // in millis
val curTime = System.currentTimeMillis
val t1 = curTime + 0
val t2 = curTime + 1
val t3 = curTime + 2
val t4 = curTime + 3
val t5 = curTime + 4
/**
* Response result common check logic
* @param rslt
* @return
*/
def commonCheck(rslt: Future[play.api.mvc.Result]): JsValue = {
status(rslt) must equalTo(OK)
contentType(rslt) must beSome.which(_ == "application/json")
val jsRslt = contentAsJson(rslt)
println("======")
println(jsRslt)
println("======")
jsRslt.as[JsObject].keys.contains("size") must equalTo(true)
(jsRslt \ "size").as[Int] must greaterThan(0)
jsRslt.as[JsObject].keys.contains("results") must equalTo(true)
val jsRsltsObj = jsRslt \ "results"
jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("from") must equalTo(true)
jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("to") must equalTo(true)
jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("_timestamp") must equalTo(true)
jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("props") must equalTo(true)
jsRsltsObj
}
def justHttpCheck(rslt: Future[play.api.mvc.Result]) = {
status(rslt) must equalTo(OK)
contentType(rslt) must beSome.which(_ == "text/plain")
Await.result(rslt, HTTP_REQ_WAITING_TIME)
}
def runTC(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = {
val srcId = tcNum.toString
val tgtId = (srcId + 1000).toString
val maxTs = opWithProps.map(t => t._1).max
def init = {
println(s"---- TC${tcNum}_init ----")
for ((ts, op, props) <- opWithProps) {
val bulkEdge = List(ts, op, "e", srcId, tgtId, testLabelName, props).mkString("\t")
val req = FakeRequest(POST, "/graphs/edges/bulk").withBody(bulkEdge)
println(s">> $req, $bulkEdge")
val res = Await.result(route(FakeRequest(POST, "/graphs/edges/bulk").withBody(bulkEdge)).get, HTTP_REQ_WAITING_TIME)
/**
* since asynchbase hbase client flush interval is 1000 millis, we wait.
*/
Thread.sleep(asyncFlushInterval)
res.header.status must equalTo(200)
}
println(s"---- TC${tcNum}_init ----")
}
def clean = {
val cleanTs = maxTs + 1
val bulkQuery = List(cleanTs, "delete", "e", srcId, tgtId, testLabelName).mkString("\t")
println(s"---- TC${tcNum}_cleanup ----")
println(s"Cleanup Query : $bulkQuery")
println(s"---- TC${tcNum}_cleanup ----")
val rslt = route(FakeRequest(POST, "/graphs/edges/bulk").withBody(bulkQuery)).get
justHttpCheck(rslt)
Thread.sleep(1000)
}
tcString in new WithTestApplication(init = init, after = clean, stepWaiting = TC_WAITING_TIME) {
val simpleQuery =
s"""
{
"srcVertices": [
{
"serviceName": "$testServiceName",
"columnName": "$testColumnName",
"id": $srcId
}
],
"steps": [
[
{
"label": "$testLabelName",
"direction": "out",
"offset": 0,
"limit": 10
}
]
]
}
""".stripMargin
val rslt = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(Json.parse(simpleQuery))).get
val jsRsltsObj = commonCheck(rslt)
(jsRsltsObj \\ "from").seq(0).toString must equalTo(srcId)
(jsRsltsObj \\ "to").seq(0).toString must equalTo(tgtId)
(jsRsltsObj \\ "_timestamp").seq(0).as[Long] must equalTo(maxTs)
for ((key, expectedVal) <- expected) {
(jsRsltsObj \\ "props").seq(0).as[JsObject].keys.contains(key) must equalTo(true)
((jsRsltsObj \\ "props").seq(0) \ key).toString must equalTo(expectedVal)
}
Await.result(rslt, HTTP_REQ_WAITING_TIME)
}
}
"Increment/Delete Application" should {
"[TC1]" in {
val tcNum = 1
val tcString = "[t1 -> t3 -> t2 test case] incr(t0) incr(t2) delete(t1) test"
val bulkQueries = List(
(t1, "increment", "{\"weight\": 10}"),
(t3, "increment", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""))
val expected = Map("time" -> "10", "weight" -> "0")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC2]" in {
val tcNum = 2
val tcString = "[t1 -> t2 -> t3 test case] incr(t1) delete(t2) incr(t3) test"
val bulkQueries = List(
(t1, "increment", "{\"weight\": 10}"),
(t2, "delete", ""),
(t3, "increment", "{\"time\": 10, \"weight\": 20}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC3]" in {
val tcNum = 3
val tcString = "[t2 -> t1 -> t3 test case] delete(t2) incr(t1) incr(t3) test "
val bulkQueries = List(
(t2, "delete", ""),
(t1, "increment", "{\"weight\": 10}"),
(t3, "increment", "{\"time\": 10, \"weight\": 20}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC4]" in {
val tcNum = 4
val tcString = "[t2 -> t3 -> t1 test case] delete(t2) incr(t3) incr(t1) test "
val bulkQueries = List(
(t2, "delete", ""),
(t3, "increment", "{\"weight\": 10}"),
(t1, "increment", "{\"time\": 10, \"weight\": 20}"))
val expected = Map("time" -> "0", "weight" -> "10")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC5]" in {
val tcNum = 5
val tcString = "[t3 -> t1 -> t2 test case] incr(t3) incr(t1) delete(t2) test "
val bulkQueries = List(
(t3, "increment", "{\"time\": 10, \"weight\": 20}"),
(t1, "increment", "{\"time\": 10}"),
(t2, "delete", ""))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC6]" in {
val tcNum = 6
val tcString = "[t3 -> t2 -> t1 test case] incr(t3) delete(t2) incr(t1) test "
val bulkQueries = List(
(t3, "increment", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""),
(t1, "increment", "{\"time\": 10}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
}
// -- Increment / Delete Application
"Insert/Delete Application" should {
"[TC7]" in {
val tcNum = 7
val tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
val bulkQueries = List(
(t1, "insert", "{\"time\": 10}"),
(t2, "delete", ""),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC8]" in {
val tcNum = 8
val tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test "
val bulkQueries = List(
(t1, "insert", "{\"time\": 10}"),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC9]" in {
val tcNum = 9
val tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test "
val bulkQueries = List(
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""),
(t1, "insert", "{\"time\": 10}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC10]" in {
val tcNum = 10
val tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test "
val bulkQueries = List(
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t1, "insert", "{\"time\": 10}"),
(t2, "delete", ""))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC11]" in {
val tcNum = 11
val tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test"
val bulkQueries = List(
(t2, "delete", ""),
(t1, "insert", "{\"time\": 10}"),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC12]" in {
val tcNum = 12
val tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test "
val bulkQueries = List(
(t2, "delete", ""),
(t3, "insert", "{\"time\": 10, \"weight\": 20}"),
(t1, "insert", "{\"time\": 10}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
} // -- Insert / Delete Application
"Update / Delete Application" should {
"[TC13]" in {
val tcNum = 13
val tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test "
val bulkQueries = List(
(t1, "update", "{\"time\": 10}"),
(t2, "delete", ""),
(t3, "update", "{\"time\": 10, \"weight\": 20}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC14]" in {
val tcNum = 14
val tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test "
val bulkQueries = List(
(t1, "update", "{\"time\": 10}"),
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC15]" in {
val tcNum = 15
val tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test "
val bulkQueries = List(
(t2, "delete", ""),
(t1, "update", "{\"time\": 10}"),
(t3, "update", "{\"time\": 10, \"weight\": 20}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC16]" in {
val tcNum = 16
val tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test"
val bulkQueries = List(
(t2, "delete", ""),
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t1, "update", "{\"time\": 10}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC17]" in {
val tcNum = 17
val tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test "
val bulkQueries = List(
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t2, "delete", ""),
(t1, "update", "{\"time\": 10}"))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
"[TC18]" in {
val tcNum = 18
val tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test "
val bulkQueries = List(
(t3, "update", "{\"time\": 10, \"weight\": 20}"),
(t1, "update", "{\"time\": 10}"),
(t2, "delete", ""))
val expected = Map("time" -> "10", "weight" -> "20")
runTC(tcNum, tcString, bulkQueries, expected)
}
} // -- Update / Delete Application
"Composite Application" should {
"[TC19]" in {
val tcNum = 19
val tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test "
val bulkQueries = List(
(t5, "update", "{\"is_blocked\": true}"),
(t1, "insert", "{\"is_hidden\": false}"),
(t3, "insert", "{\"is_hidden\": false, \"weight\": 10}"),
(t2, "delete", ""),
(t4, "update", "{\"time\": 1, \"weight\": -10}"))
val expected = Map("time" -> "1", "weight" -> "-10", "is_hidden" -> "false", "is_blocked" -> "true")
runTC(tcNum, tcString, bulkQueries, expected)
}
} // -- Composite Application
}
}
/**
*
* @param app
* @param init This function run without play application context
* @param after This function run without play application context
*/
class WithTestApplication(override val app: FakeApplication = FakeApplication(), init: => Unit, after: => Unit, stepWaiting: Int = 1000) extends WithApplication {
override def around[T: AsResult](t: => T): Result = {
Helpers.running(app) {
init
println(">> [init] Start waiting....")
Thread.sleep(stepWaiting)
println("<< [init] end waiting.")
val rslt = AsResult.effectively(t)
println(">> [TC] Start waiting....")
Thread.sleep(stepWaiting)
println("<< [TC] end waiting.")
after
println(">> [after] Start waiting....")
Thread.sleep(stepWaiting)
println("<< [after] end waiting.")
rslt
}
}
}
abstract class IntegritySpecificationBase extends Specification {
protected val testServiceName = "s2graph"
protected val testLabelName = "s2graph_label_test"
protected val testColumnName = "user_id"
val createService =
s"""
|{
| "serviceName" : "$testServiceName"
|}
""".stripMargin
val createLabel =
s"""
|{
| "label": "$testLabelName",
| "srcServiceName": "$testServiceName",
| "srcColumnName": "$testColumnName",
| "srcColumnType": "long",
| "tgtServiceName": "$testServiceName",
| "tgtColumnName": "$testColumnName",
| "tgtColumnType": "long",
| "indexProps": {
| "time": 0,
| "weight":0,
| "is_hidden" : false,
| "is_blocked" : false
| },
| "consistencyLevel": "strong"
|}
""".stripMargin
def initialize = {
running(FakeApplication()) {
GraphAggregatorActor.init()
KafkaAggregatorActor.init()
Graph(Config.conf)(ExecutionContext.Implicits.global)
// 1. createService
var result = AdminController.createServiceInner(Json.parse(createService))
println(s">> Service created : $createService, $result")
// 2. createLabel
Label.findByName(testLabelName) match {
case None =>
result = AdminController.createLabelInner(Json.parse(createLabel))
println(s">> Label created : $createLabel, $result")
case Some(label) =>
println(s">> Label already exist: $createLabel, $label")
}
}
}
def cleanup = {
running(FakeApplication()) {
// Graph(Config.conf)(ExecutionContext.Implicits.global)
// // 1. delete label ( currently NOT supported )
// var result = AdminController.deleteLabelInner(testLabelName)
// println(s">> Label deleted : $testLabelName, $result")
// 2. delete service ( currently NOT supported )
GraphAggregatorActor.shutdown()
KafkaAggregatorActor.shutdown()
}
}
//Do setup work here
step {
println("==== [IntegritySpecificationBase] Doing setup work... ====")
initialize
success
}
//Include the real spec from the derived class
include(spec)
//Do shutdown work here
step {
println("===== [IntegritySpecificationBase] Doing shutdown work... =====")
cleanup
success
}
/**
* To be implemented in the derived class. Returns the real specification
* @return Specification
*/
def spec: Specification
}