blob: 00962127d792743f4936a32df1963b71845897d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.step
import org.apache.s2graph.core.Integrate.IntegrateCommon
import org.apache.s2graph.core._
import org.apache.s2graph.core.parsers.Where
import org.apache.s2graph.core.rest.RequestParser
import play.api.libs.json.Json
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
class GraphStepTest extends IntegrateCommon {
import TestUtil._
import RxStep._
val insert = "insert"
val e = "e"
val weight = "weight"
val is_hidden = "is_hidden"
override def initTestData(): Unit = {
super.initTestData()
insertEdgesSync(
toEdge(1000, insert, e, 1, 10, testLabelName),
toEdge(1000, insert, e, 2, 20, testLabelName),
toEdge(1000, insert, e, 3, 30, testLabelName),
toEdge(1000, insert, e, 100, 1, testLabelName, Json.obj(weight -> 30))
)
}
test("basic compose") {
val vertices = Seq(
graph.toVertex(testServiceName, testColumnName, 1),
graph.toVertex(testServiceName, testColumnName, 2),
graph.toVertex(testServiceName, testColumnName, 3),
graph.toVertex(testServiceName, testColumnName, 10)
)
val v1 = VertexFetchStep(graph)
val qpIn = QueryParam(labelName = testLabelName, direction = "in")
val qpOut = QueryParam(labelName = testLabelName, direction = "out")
val e1 = EdgeFetchStep(graph, qpIn)
val e2 = EdgeFetchStep(graph, qpOut)
val where = Where("_to = 20").get
val q =
v1.apply(vertices) // vertices: 4 - (1, 2, 3, 10)
.flatMap(e1) // edges: 4 - (srcId = 1, 2, 3 and tgtId = 10)
.filter(where.filter) // filterOut (only _to == 20)
.map(_.tgtForVertex) // vertices: (20)
.flatMap(v => e1.apply(v) ++ e2.apply(v)) // edges: (tgtId = 20)
val res = q.toBlocking.toList
}
test("Query to RxSteps") {
def q(id: Int) = Json.parse(
s"""
{
"srcVertices": [
{ "serviceName": "$testServiceName",
"columnName": "$testColumnName",
"id": $id
}],
"steps": [
[{
"label": "$testLabelName",
"direction": "out",
"offset": 0,
"limit": 10
},
{
"label": "$testLabelName",
"direction": "in",
"offset": 0,
"limit": 10
}],
[{
"label": "$testLabelName",
"direction": "out",
"offset": 0,
"limit": 10,
"where": "weight > 10"
},
{
"label": "$testLabelName",
"direction": "in",
"offset": 0,
"limit": 10
}]
]
}""")
val queryJs = q(1)
val requestParser = new RequestParser(graph)
val query = requestParser.toQuery(queryJs, None)
val actual = RxStep.toObservable(query)(graph).toBlocking.toList.sortBy(_.srcVertex.innerIdVal.toString)
val expected = Await.result(graph.getEdges(query), Duration("30 sec")).edgeWithScores.map(_.edge).sortBy(_.srcVertex.innerIdVal.toString)
actual shouldBe expected
}
}