blob: 017754355ce4a5d14974df466b79f5d789e833d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.Integrate
import com.typesafe.config._
import org.apache.s2graph.core.schema.{GlobalIndex, Label, Schema}
import org.apache.s2graph.core.rest.{RequestParser, RestHandler}
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core._
import org.scalatest._
import play.api.libs.json.{JsValue, Json}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
import TestUtil._
var graph: S2Graph = _
var parser: RequestParser = _
var management: Management = _
var config: Config = _
override def beforeAll = {
config = ConfigFactory.load()
graph = new S2Graph(config)(ExecutionContext.Implicits.global)
management = new Management(graph)
parser = new RequestParser(graph)
initTestData()
}
override def afterAll(): Unit = {
graph.shutdown()
}
/**
* Make Service, Label, Vertex for integrate test
*/
def initTestData() = {
logger.info("[init start]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
Management.deleteService(testServiceName)
// 1. createService
val jsValue = Json.parse(createService)
val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) =
parser.toServiceElements(jsValue)
val tryRes =
management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
logger.info(s">> Service created : $createService, $tryRes")
val labelNames = Map(testLabelName -> testLabelNameCreate,
testLabelName2 -> testLabelName2Create,
testLabelNameV1 -> testLabelNameV1Create,
testLabelNameWeak -> testLabelNameWeakCreate,
testLabelNameLabelIndex -> testLabelNameLabelIndexCreate)
for {
(labelName, create) <- labelNames
} {
Management.deleteLabel(labelName)
Label.findByName(labelName, useCache = false) match {
case None =>
val json = Json.parse(create)
logger.info(s">> Create Label")
logger.info(create)
val tryRes = for {
label <- parser.toLabelElements(json)
} yield label
tryRes.get
case Some(label) =>
logger.info(s">> Label already exist: $create, $label")
}
}
val vertexPropsKeys = List(
("age", "integer", "0"),
("im", "string", "-")
)
Seq(testColumnName, testTgtColumnName).foreach { columnName =>
vertexPropsKeys.map { case (key, keyType, defaultValue) =>
Management.addVertexProp(testServiceName, columnName, key, keyType, defaultValue, storeInGlobalIndex = true)
}
}
// vertex type global index.
// val globalVertexIndex = management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", Seq("age"))
// edge type global index.
// val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test_weight_time_edge", Seq("weight", "time"))
logger.info("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
}
/**
* Test Helpers
*/
object TestUtil {
implicit def ec = scala.concurrent.ExecutionContext.global
// def checkEdgeQueryJson(params: Seq[(String, String, String, String)]) = {
// val arr = for {
// (label, dir, from, to) <- params
// } yield {
// Json.obj("label" -> label, "direction" -> dir, "from" -> from, "to" -> to)
// }
//
// val s = Json.toJson(arr)
// s
// }
def deleteAllSync(jsValue: JsValue) = {
val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json)
val srcVertices = vertices
val future = graph.deleteAllAdjacentEdges(srcVertices.toList, labels, GraphUtil.directions(direction), ts)
future
})
Await.result(future, HttpRequestWaitingTime)
}
def getEdgesSync(s2Query: Query): JsValue = {
logger.info(s2Query.toString)
val stepResult = Await.result(graph.getEdges(s2Query), HttpRequestWaitingTime)
val result = PostProcess.toJson(Option(s2Query.jsonQuery))(graph, s2Query.queryOption, stepResult)
// val result = Await.result(graph.getEdges(s2Query).(PostProcess.toJson), HttpRequestWaitingTime)
logger.debug(s"${Json.prettyPrint(result)}")
result
}
def getEdgesSync(queryJson: JsValue): JsValue = {
logger.info(Json.prettyPrint(queryJson))
val restHandler = new RestHandler(graph)
val result = Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toJson(Option(queryJson))), HttpRequestWaitingTime)
logger.debug(s"${Json.prettyPrint(result)}")
result
}
def insertEdgesSync(bulkEdges: String*) = {
logger.debug(s"${bulkEdges.mkString("\n")}")
val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true)
val jsResult = Await.result(req, HttpRequestWaitingTime)
jsResult
}
def insertEdgesAsync(bulkEdges: String*) = {
val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true)
req
}
def toEdge(elems: Any*): String = elems.mkString("\t")
// common tables
val testServiceName = "s2graph"
val testLabelName = "s2graph_label_test"
val testLabelName2 = "s2graph_label_test_2"
val testLabelNameV1 = "s2graph_label_test_v1"
val testLabelNameWeak = "s2graph_label_test_weak"
val testLabelNameLabelIndex = "s2graph_label_test_index"
val testColumnName = "user_id_test"
val testColumnType = "long"
val testTgtColumnName = "item_id_test"
val testHTableName = "test-htable"
val newHTableName = "new-htable"
val index1 = "idx_1"
val index2 = "idx_2"
val idxStoreInDropDegree = "idx_drop_In"
val idxStoreOutDropDegree = "idx_drop_out"
val idxStoreIn = "idx_store_In"
val idxStoreOut = "idx_store_out"
val idxDropInStoreDegree = "idx_drop_in_store_degree"
val idxDropOutStoreDegree = "idx_drop_out_store_degree"
val NumOfEachTest = 30
val HttpRequestWaitingTime = Duration("60 seconds")
val createService = s"""{"serviceName" : "$testServiceName"}"""
val testLabelNameCreate =
s"""
{
"label": "$testLabelName",
"srcServiceName": "$testServiceName",
"srcColumnName": "$testColumnName",
"srcColumnType": "long",
"tgtServiceName": "$testServiceName",
"tgtColumnName": "$testColumnName",
"tgtColumnType": "long",
"indices": [
{"name": "$index1", "propNames": ["weight", "time", "is_hidden", "is_blocked"]},
{"name": "$index2", "propNames": ["_timestamp"]}
],
"props": [
{
"name": "time",
"dataType": "long",
"defaultValue": 0
},
{
"name": "weight",
"dataType": "long",
"defaultValue": 0
},
{
"name": "is_hidden",
"dataType": "boolean",
"defaultValue": false
},
{
"name": "is_blocked",
"dataType": "boolean",
"defaultValue": false
}
],
"consistencyLevel": "strong",
"schemaVersion": "v4",
"compressionAlgorithm": "gz",
"hTableName": "$testHTableName"
}"""
val testLabelName2Create =
s"""
{
"label": "$testLabelName2",
"srcServiceName": "$testServiceName",
"srcColumnName": "$testColumnName",
"srcColumnType": "long",
"tgtServiceName": "$testServiceName",
"tgtColumnName": "$testTgtColumnName",
"tgtColumnType": "string",
"indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}],
"props": [
{
"name": "time",
"dataType": "long",
"defaultValue": 0
},
{
"name": "weight",
"dataType": "long",
"defaultValue": 0
},
{
"name": "is_hidden",
"dataType": "boolean",
"defaultValue": false
},
{
"name": "is_blocked",
"dataType": "boolean",
"defaultValue": false
}
],
"consistencyLevel": "strong",
"isDirected": false,
"schemaVersion": "v3",
"compressionAlgorithm": "gz"
}"""
val testLabelNameV1Create =
s"""
{
"label": "$testLabelNameV1",
"srcServiceName": "$testServiceName",
"srcColumnName": "$testColumnName",
"srcColumnType": "long",
"tgtServiceName": "$testServiceName",
"tgtColumnName": "${testTgtColumnName}_v1",
"tgtColumnType": "string",
"indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}],
"props": [
{
"name": "time",
"dataType": "long",
"defaultValue": 0
},
{
"name": "weight",
"dataType": "long",
"defaultValue": 0
},
{
"name": "is_hidden",
"dataType": "boolean",
"defaultValue": false
},
{
"name": "is_blocked",
"dataType": "boolean",
"defaultValue": false
}
],
"consistencyLevel": "strong",
"isDirected": true,
"schemaVersion": "v2",
"compressionAlgorithm": "gz"
}"""
val testLabelNameWeakCreate =
s"""
{
"label": "$testLabelNameWeak",
"srcServiceName": "$testServiceName",
"srcColumnName": "$testColumnName",
"srcColumnType": "long",
"tgtServiceName": "$testServiceName",
"tgtColumnName": "$testTgtColumnName",
"tgtColumnType": "string",
"indices": [
{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]},
{"name": "$index2", "propNames": ["time"]}
],
"props": [
{
"name": "time",
"dataType": "long",
"defaultValue": 0
},
{
"name": "weight",
"dataType": "long",
"defaultValue": 0
},
{
"name": "is_hidden",
"dataType": "boolean",
"defaultValue": false
},
{
"name": "is_blocked",
"dataType": "boolean",
"defaultValue": false
}
],
"consistencyLevel": "weak",
"isDirected": true,
"compressionAlgorithm": "gz"
}"""
val testLabelNameLabelIndexCreate =
s"""
{
"label": "$testLabelNameLabelIndex",
"srcServiceName": "$testServiceName",
"srcColumnName": "$testColumnName",
"srcColumnType": "long",
"tgtServiceName": "$testServiceName",
"tgtColumnName": "$testColumnName",
"tgtColumnType": "long",
"indices": [
{"name": "$index1", "propNames": ["weight", "time", "is_hidden", "is_blocked"]},
{"name": "$idxStoreInDropDegree", "propNames": ["time"], "options": { "in": {"storeDegree": false }, "out": {"method": "drop", "storeDegree": false }}},
{"name": "$idxStoreOutDropDegree", "propNames": ["weight"], "options": { "out": {"storeDegree": false}, "in": { "method": "drop", "storeDegree": false }}},
{"name": "$idxStoreIn", "propNames": ["is_hidden"], "options": { "out": {"method": "drop", "storeDegree": false }}},
{"name": "$idxStoreOut", "propNames": ["weight", "is_blocked"], "options": { "in": {"method": "drop", "storeDegree": false }, "out": {"method": "normal" }}},
{"name": "$idxDropInStoreDegree", "propNames": ["is_blocked"], "options": { "in": {"method": "drop" }, "out": {"method": "drop", "storeDegree": false }}},
{"name": "$idxDropOutStoreDegree", "propNames": ["weight", "is_blocked", "_timestamp"], "options": { "in": {"method": "drop", "storeDegree": false }, "out": {"method": "drop"}}}
],
"props": [
{
"name": "time",
"dataType": "long",
"defaultValue": 0
},
{
"name": "weight",
"dataType": "long",
"defaultValue": 0
},
{
"name": "is_hidden",
"dataType": "boolean",
"defaultValue": false
},
{
"name": "is_blocked",
"dataType": "boolean",
"defaultValue": false
}
],
"consistencyLevel": "strong",
"schemaVersion": "v4",
"compressionAlgorithm": "gz",
"hTableName": "$testHTableName"
}"""
}
}