make step works
diff --git a/.gitignore b/.gitignore
index ad5a5e9..0066e18 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,7 @@
.cache
.history
.lib/
+*/lib/
var/*
dist/*
target/
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 229bf41..0368715 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -57,7 +57,8 @@
"com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(),
"org.scala-lang.modules" %% "scala-pickling" % "0.10.1",
"net.pishen" %% "annoy4s" % annoy4sVersion,
- "org.tensorflow" % "tensorflow" % tensorflowVersion
+ "org.tensorflow" % "tensorflow" % tensorflowVersion,
+ "io.reactivex" %% "rxscala" % "0.26.5"
)
libraryDependencies := {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
new file mode 100644
index 0000000..e0e23f6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
@@ -0,0 +1,58 @@
+package org.apache.s2graph.core.step
+
+import org.apache.s2graph.core._
+import rx.lang.scala.Observable
+
+import scala.language.higherKinds
+import scala.language.existentials
+
+trait RxStep[-A, +B] extends (A => Observable[B])
+
+object RxStep {
+
+ case class VertexFetchStep(g: S2GraphLike) extends RxStep[Seq[S2VertexLike], S2VertexLike] {
+ override def apply(vertices: Seq[S2VertexLike]): Observable[S2VertexLike] = {
+ Observable.from(vertices)
+ }
+ }
+
+ case class EdgeFetchStep(g: S2GraphLike, qp: QueryParam) extends RxStep[S2VertexLike, S2EdgeLike] {
+ override def apply(v: S2VertexLike): Observable[S2EdgeLike] = {
+ implicit val ec = g.ec
+
+ val step = org.apache.s2graph.core.Step(Seq(qp))
+ val q = Query(Seq(v), steps = Vector(step))
+
+ val f = g.getEdges(q).map { stepResult =>
+ val edges = stepResult.edgeWithScores.map(_.edge)
+ Observable.from(edges)
+ }
+
+ Observable.from(f).flatten
+ }
+ }
+
+ private def merge[A, B](steps: RxStep[A, B]*): RxStep[A, B] = new RxStep[A, B] {
+ override def apply(in: A): Observable[B] =
+ steps.map(_.apply(in)).toObservable.flatten
+ }
+
+ def toObservable(q: Query)(implicit graph: S2GraphLike): Observable[S2EdgeLike] = {
+ val v1: Observable[S2VertexLike] = VertexFetchStep(graph).apply(q.vertices)
+
+ val serialSteps = q.steps.map { step =>
+ val parallelSteps = step.queryParams.map(qp => EdgeFetchStep(graph, qp))
+ merge(parallelSteps: _*)
+ }
+
+ v1.flatMap { v =>
+ val initOpt = serialSteps.headOption.map(_.apply(v))
+
+ initOpt.map { init =>
+ serialSteps.tail.foldLeft(init) { case (prev, next) =>
+ prev.map(_.tgtForVertex).flatMap(next)
+ }
+ }.getOrElse(Observable.empty)
+ }
+ }
+}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
new file mode 100644
index 0000000..96e49a0
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
@@ -0,0 +1,110 @@
+package org.apache.s2graph.core.step
+
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.parsers.Where
+import org.apache.s2graph.core.rest.RequestParser
+import play.api.libs.json.Json
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+class GraphStepTest extends IntegrateCommon {
+
+ import TestUtil._
+ import RxStep._
+
+ val insert = "insert"
+ val e = "e"
+ val weight = "weight"
+ val is_hidden = "is_hidden"
+
+ override def initTestData(): Unit = {
+ super.initTestData()
+
+ insertEdgesSync(
+ toEdge(1000, insert, e, 1, 10, testLabelName),
+ toEdge(1000, insert, e, 2, 20, testLabelName),
+ toEdge(1000, insert, e, 3, 30, testLabelName),
+
+ toEdge(1000, insert, e, 100, 1, testLabelName, Json.obj(weight -> 30))
+ )
+ }
+
+ test("basic compose") {
+ val vertices = Seq(
+ graph.toVertex(testServiceName, testColumnName, 1),
+ graph.toVertex(testServiceName, testColumnName, 2),
+ graph.toVertex(testServiceName, testColumnName, 3),
+
+ graph.toVertex(testServiceName, testColumnName, 10)
+ )
+
+ val v1 = VertexFetchStep(graph)
+
+ val qpIn = QueryParam(labelName = testLabelName, direction = "in")
+ val qpOut = QueryParam(labelName = testLabelName, direction = "out")
+
+ val e1 = EdgeFetchStep(graph, qpIn)
+ val e2 = EdgeFetchStep(graph, qpOut)
+
+ val where = Where("_to = 20").get
+
+ val q =
+ v1.apply(vertices) // vertices: 4 - (1, 2, 3, 10)
+ .flatMap(e1) // edges: 4 - (srcId = 1, 2, 3 and tgtId = 10)
+ .filter(where.filter) // filterOut (only _to == 20)
+ .map(_.tgtForVertex) // vertices: (20)
+ .flatMap(v => e1.apply(v) ++ e2.apply(v)) // edges: (tgtId = 20)
+
+ val res = q.toBlocking.toList
+ }
+
+ test("Query to RxSteps") {
+ def q(id: Int) = Json.parse(
+ s"""
+ {
+ "srcVertices": [
+ { "serviceName": "$testServiceName",
+ "columnName": "$testColumnName",
+ "id": $id
+ }],
+ "steps": [
+ [{
+ "label": "$testLabelName",
+ "direction": "out",
+ "offset": 0,
+ "limit": 10
+ },
+ {
+ "label": "$testLabelName",
+ "direction": "in",
+ "offset": 0,
+ "limit": 10
+ }],
+
+ [{
+ "label": "$testLabelName",
+ "direction": "out",
+ "offset": 0,
+ "limit": 10,
+ "where": "weight > 10"
+ },
+ {
+ "label": "$testLabelName",
+ "direction": "in",
+ "offset": 0,
+ "limit": 10
+ }]
+ ]
+ }""")
+
+ val queryJs = q(1)
+ val requestParser = new RequestParser(graph)
+ val query = requestParser.toQuery(queryJs, None)
+
+ val actual = RxStep.toObservable(query)(graph).toBlocking.toList.sortBy(_.srcVertex.innerIdVal.toString)
+ val expected = Await.result(graph.getEdges(query), Duration("30 sec")).edgeWithScores.map(_.edge).sortBy(_.srcVertex.innerIdVal.toString)
+
+ actual shouldBe expected
+ }
+}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
new file mode 100644
index 0000000..f29a346
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
@@ -0,0 +1,78 @@
+package org.apache.s2graph.core.step
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
+import rx.lang.scala.{Observable, Subscription}
+
+class StepTest extends FunSuite with Matchers {
+
+ trait GraphE {
+ def id: String
+ }
+
+ case class V(id: String) extends GraphE
+
+ case class E(id: String, src: V, tgt: V) extends GraphE
+
+ object GraphModels {
+ /**
+ * vertices: [A, B]
+ * edges: [E(A, B), E(B, A)]
+ */
+ val va = V("V_A")
+ val vb = V("V_B")
+
+ val e1 = E("E1", va, vb)
+ val e2 = E("E2", vb, va)
+
+ val allVertices = List(va, vb)
+ val allEdges = List(e1, e2)
+ }
+
+ case class VertexStep(vid: String) extends RxStep[Unit, V] {
+ override def apply(in: Unit): Observable[V] = {
+ val vertices = GraphModels.allVertices.filter(v => vid == v.id)
+ Observable.from(vertices)
+ }
+ }
+
+ case class EdgeStep(dir: String) extends RxStep[V, E] {
+ override def apply(in: V): Observable[E] = {
+ val edges = if (dir == "OUT") {
+ GraphModels.allEdges.filter(e => in == e.src)
+ } else {
+ GraphModels.allEdges.filter(e => in == e.tgt)
+ }
+
+ Observable.from(edges)
+ }
+ }
+
+ case class EdgeToVertexStep() extends RxStep[E, V] {
+ override def apply(in: E): Observable[V] = {
+ Observable.just(in.tgt)
+ }
+ }
+
+ test("basic step") {
+ val v1: RxStep[Unit, V] = VertexStep("V_A")
+
+ val e1: RxStep[V, E] = EdgeStep("OUT")
+ val e2 = EdgeStep("IN")
+
+ val g = v1(())
+ .flatMap(v => e1(v) ++ e2(v))
+ .flatMap(EdgeToVertexStep())
+ .flatMap(v => e1(v) ++ e2(v))
+ .distinct
+
+ val expected = List(
+ E("E1", V("V_A"), V("V_B")),
+ E("E2", V("V_B"), V("V_A"))
+ ).sortBy(_.id)
+
+ val actual = g.toBlocking.toList.sortBy(_.id)
+
+ println(actual)
+ actual shouldBe expected
+ }
+}