merging https://github.com/daumkakao/s2graph/pull/14 into master
diff --git a/app/com/daumkakao/s2graph/rest/config/Config.scala b/app/com/daumkakao/s2graph/rest/config/Config.scala
index 3be7b28..4bef00a 100644
--- a/app/com/daumkakao/s2graph/rest/config/Config.scala
+++ b/app/com/daumkakao/s2graph/rest/config/Config.scala
@@ -1,7 +1,8 @@
package com.daumkakao.s2graph.rest.config
import java.util.concurrent.TimeUnit
-import com.codahale.metrics.{Metric, ConsoleReporter, MetricRegistry}
+import com.codahale.metrics.{Slf4jReporter, Metric, ConsoleReporter, MetricRegistry}
+import org.slf4j.LoggerFactory
import play.api.{Play, Logger}
object Config {
@@ -32,10 +33,15 @@
trait Instrumented extends nl.grons.metrics.scala.InstrumentedBuilder {
val metricRegistry: MetricRegistry = Config.metricRegistry
- val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+ val reporter = Slf4jReporter.forRegistry(metricRegistry)
+ .outputTo(LoggerFactory.getLogger(classOf[Instrumented]))
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
+// val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+// .convertRatesTo(TimeUnit.SECONDS)
+// .convertDurationsTo(TimeUnit.MILLISECONDS)
+// .build()
val stats = new collection.mutable.HashMap[String, Metric]
diff --git a/app/controllers/ApplicationController.scala b/app/controllers/ApplicationController.scala
index 8384fd9..ab08044 100644
--- a/app/controllers/ApplicationController.scala
+++ b/app/controllers/ApplicationController.scala
@@ -17,7 +17,7 @@
var connectionOption = CONNECTION -> "Keep-Alive"
var keepAliveOption = "Keep-Alive" -> "timeout=5, max=100"
- consoleReporter.start(10, TimeUnit.SECONDS)
+ reporter.start(10, TimeUnit.SECONDS)
def updateHealthCheck(isHealthy: Boolean) = Action { request =>
this.isHealthy = isHealthy
diff --git a/app/controllers/RequestParser.scala b/app/controllers/RequestParser.scala
index 945368b..c5af045 100644
--- a/app/controllers/RequestParser.scala
+++ b/app/controllers/RequestParser.scala
@@ -84,59 +84,6 @@
WhereParser(label).parse(where)
}
}
- case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser {
-
- val metaProps = label.metaPropsInvMap ++ Map(LabelMeta.from.name -> LabelMeta.from, LabelMeta.to.name -> LabelMeta.to)
-
- def where: Parser[Where] = rep(clause) ^^ (Where(_))
-
- def clause: Parser[Clause] = (predicate | parens) * (
- "and" ^^^ { (a: Clause, b: Clause) => And(a, b) } |
- "or" ^^^ { (a: Clause, b: Clause) => Or(a, b) })
-
- def parens: Parser[Clause] = "(" ~> clause <~ ")"
-
- def boolean = ("true" ^^^ (true) | "false" ^^^ (false))
-
- /** floating point is not supported yet **/
- def predicate = (
- (ident ~ "=" ~ ident | ident ~ "=" ~ decimalNumber | ident ~ "=" ~ stringLiteral) ^^ {
- case f ~ "=" ~ s =>
- metaProps.get(f) match {
- case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
- case Some(metaProp) =>
- Equal(metaProp.seq, toInnerVal(s, metaProp.dataType))
- }
-
- }
- | (ident ~ "between" ~ ident ~ "and" ~ ident | ident ~ "between" ~ decimalNumber ~ "and" ~ decimalNumber
- | ident ~ "between" ~ stringLiteral ~ "and" ~ stringLiteral) ^^ {
- case f ~ "between" ~ minV ~ "and" ~ maxV =>
- metaProps.get(f) match {
- case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
- case Some(metaProp) =>
- Between(metaProp.seq, toInnerVal(minV, metaProp.dataType), toInnerVal(maxV, metaProp.dataType))
- }
- }
- | (ident ~ "in" ~ "(" ~ rep(ident | decimalNumber | stringLiteral | "true" | "false" | ",") ~ ")") ^^ {
- case f ~ "in" ~ "(" ~ vals ~ ")" =>
- metaProps.get(f) match {
- case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
- case Some(metaProp) =>
- val values = vals.filter(v => v != ",").map { v =>
- toInnerVal(v, metaProp.dataType)
- }
- IN(metaProp.seq, values.toSet)
- }
- })
-
- def parse(sql: String): Option[Where] = {
- parseAll(where, sql) match {
- case Success(r, q) => Some(r)
- case x => println(x); None
- }
- }
- }
def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
try {
diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala b/loader/src/main/scala/subscriber/GraphSubscriber.scala
index b1d6051..aec5c4f 100644
--- a/loader/src/main/scala/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/subscriber/GraphSubscriber.scala
@@ -4,11 +4,11 @@
import com.daumkakao.s2graph.core._
import com.daumkakao.s2graph.core.Graph
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.client.{HConnection, HConnectionManager}
+import org.apache.hadoop.hbase.client.{HTableInterface, HConnection, HConnectionManager}
import org.apache.spark.{Accumulable, SparkContext}
import s2.spark.{HashMapParam, SparkApp, WithKafka}
import subscriber.parser.{GraphParser, GraphParsers}
@@ -26,67 +26,39 @@
zkQuorum = zkAddr.getOrElse("localhost")
kafkaBrokers = kafkaBrokerList.getOrElse("localhost:9092")
val s = s"""
-db.default.driver=com.mysql.jdbc.Driver
-db.default.url="$database"
-db.default.user=graph
-db.default.password=graph
+ |logger.root=ERROR
+ |
+ |# Logger used by the framework:
+ |logger.play=INFO
+ |
+ |# Logger provided to your application:
+ |logger.application=DEBUG
+ |
+ |# APP PHASE
+ |phase=dev
+ |
+ |# DB
+ |db.default.driver=com.mysql.jdbc.Driver
+ |db.default.url="$database"
+ |db.default.user=graph
+ |db.default.password=graph
+ |
+ |# Query server
+ |is.query.server=true
+ |is.write.server=true
+ |
+ |# Local Cache
+ |cache.ttl.seconds=60
+ |cache.max.size=100000
+ |
+ |# HBASE
+ |hbase.client.operation.timeout=60000
+ |
+ |# Kafka
+ |kafka.metadata.broker.list="$kafkaBrokers"
+ |kafka.producer.pool.size=0
+ | """.stripMargin
-is.query.server=true
-is.analyzer=false
-is.test.query.server=false
-test.sample.prob=0.1
-
-cache.ttl.seconds=60000
-cache.max.size=100000
-
-hbase.connection.pool.size=1
-hbase.table.pool.size=10
-hbase.client.ipc.pool.size=1
-zookeeper.recovery.retry=10
-zookeeper.session.timeout=180000
-hbase.zookeeper.quorum="$zkQuorum"
-hbase.table.name="s2graph-alpha"
-hbase.client.operation.timeout=10000
-hbase.client.retries.number=10
-hbase.client.write.operation.timeout=10000
-hbase.client.write.retries.number=10
-
-kafka.metadata.broker.list="$kafkaBrokers"
-kafka.request.required.acks=1
-kafka.producer.type="sync"
-kafka.producer.buffer.flush.time=1000
-kafka.producer.buffer.size=1000
-kafka.producer.pool.size=1
-kafka.aggregate.flush.timeout=1000
-
-# Aggregator
-client.aggregate.buffer.size=100
-client.aggregate.buffer.flush.time=10000
-client.aggregate.pool.size=1
-
-
-# blocking execution context
-contexts {
- query {
- fork-join-executor {
- parallelism-min = 1
- parallelism-max = 1
- }
- }
- blocking {
- fork-join-executor {
- parallelism-min = 1
- parallelism-max = 1
- }
- }
- scheduler {
- fork-join-executor {
- parallelism-min = 1
- parallelism-max = 1
- }
- }
-}
- """
println(s)
ConfigFactory.parseString(s)
}
@@ -98,17 +70,11 @@
lazy val producer = new Producer[String, String](kafkaConf(GraphConfig.kafkaBrokers))
-
+ var config: Config = _
private val writeBufferSize = 1024 * 1024 * 8
private val sleepPeriod = 10000
private val maxTryNum = 10
-// lazy val graph = {
-// println(System.getProperty("phase"))
-// Graph.apply(GraphConfig.apply(System.getProperty("phase"), None, None))(ExecutionContext.Implicits.global)
-// println(Graph.config)
-// println(Graph.hbaseConfig)
-// Graph
-// }
+
def toOption(s: String) = {
s match {
case "" | "none" => None
@@ -116,16 +82,14 @@
}
}
def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: String) : Unit = {
- apply(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList))
+ config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList))
+ Graph.apply(config)(ExecutionContext.Implicits.global)
}
- def apply(phase: String, dbUrl: Option[String], zkQuorum: Option[String], kafkaBrokerList: Option[String]): Unit = {
- Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, kafkaBrokerList))(ExecutionContext.Implicits.global)
- }
+// def apply(phase: String, dbUrl: Option[String], zkQuorum: Option[String], kafkaBrokerList: Option[String]): Unit = {
+// Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, kafkaBrokerList))(ExecutionContext.Implicits.global)
+// }
def report(key: String, value: Option[String], topic: String = "report") = {
- // val ts = System.currentTimeMillis().toString
val msg = Seq(Some(key), value).flatten.mkString("\t")
-
- // val producer = new Producer[String, String](kafkaConf(Config.KAFKA_METADATA_BROKER_LIST))
val kafkaMsg = new KeyedMessage[String, String](topic, msg)
producer.send(kafkaMsg)
}
@@ -133,15 +97,15 @@
/**
* bulkMutates read connection and table info from database.
*/
- def store(msgs: Seq[String])(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
- // assert(msgs.size >= maxSize)
+ def store(msgs: Seq[String], labelToReplace: Option[String] = None)(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
val counts = HashMap[String, Long]()
val statFunc = storeStat(counts)(mapAccOpt)_
val elements = (for (msg <- msgs) yield {
statFunc("total", 1)
- val element = Graph.toGraphElement(msg)
+ val element = Graph.toGraphElement(msg, labelToReplace)
+ println(element)
element match {
case Some(e) =>
statFunc("parseOk", 1)
@@ -167,6 +131,64 @@
}
counts
}
+ def storeBulk(conn: HConnection, tableName: String)(msgs: Seq[String], labelToReplace: Option[String] = None)(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
+ val counts = HashMap[String, Long]()
+ val statFunc = storeStat(counts)(mapAccOpt) _
+ val edges = (for (msg <- msgs) yield {
+ statFunc("total", 1)
+ try {
+ Graph.toGraphElement(msg, labelToReplace) match {
+ case Some(e) if e.isInstanceOf[Edge] =>
+ statFunc("parseOk", 1)
+ Some(e.asInstanceOf[Edge])
+ case None =>
+ statFunc("errorParsing", 1)
+ None
+ }
+ } catch {
+ case e: Throwable =>
+ System.err.println(s"$msg $e", e)
+ None
+ }
+ }).flatten.toList
+
+ println(edges.take(10))
+
+ /**
+ * don't use database for connection and table.
+ */
+ // throw all exception to caller.
+
+ def storeRec(edges: List[Edge], tryNum: Int = maxTryNum): Unit = {
+ if (tryNum <= 0) {
+ statFunc("errorStore", edges.size)
+ throw new RuntimeException(s"retry failed after $maxTryNum")
+ }
+ val table = conn.getTable(tableName)
+ table.setAutoFlush(false, false)
+ table.setWriteBufferSize(writeBufferSize)
+ try {
+ // we only care about insertBulk
+ val puts = edges.flatMap { edge =>
+ edge.relatedEdges.flatMap { relatedEdge =>
+ relatedEdge.insertBulk() ++ relatedEdge.buildVertexPuts()
+ }
+ }
+ println(puts.take(10))
+ table.put(puts)
+ statFunc("storeOk", msgs.size)
+ } catch {
+ case e: Throwable =>
+ e.printStackTrace()
+ Thread.sleep(sleepPeriod)
+ storeRec(edges, tryNum - 1)
+ } finally {
+ table.close()
+ }
+ }
+ storeRec(edges)
+ counts
+ }
def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = {
counts.put(key, counts.getOrElse(key, 0L) + value)
mapAccOpt match {
@@ -174,108 +196,62 @@
case Some(mapAcc) => mapAcc += (key -> value)
}
}
-//
-// /**
-// * caller of this method should split msgs into reasonable size.
-// */
-// def storeBulk(conn: HConnection, msgs: Seq[String])(mapAccOpt: Option[HashMapAccumulable], zkQuorum: String, tableName: String): Iterable[(String, Long)] = {
-// val counts = HashMap[String, Long]()
-// val statFunc = storeStat(counts)(mapAccOpt)_
-// val edges = (for (msg <- msgs) yield {
-// statFunc("total", 1)
-//
-// val edge = Graph.toEdge(msg)
-// edge match {
-// case Some(e) =>
-// statFunc("parseOk", 1)
-// edge
-// case None =>
-// statFunc("errorParsing", 1)
-// None
-// }
-// }).flatten.toList
-//
-// storeRec(edges)
-// /**
-// * don't use database for connection and table.
-// */
-// // throw all exception to caller.
-//
-// def storeRec(edges: List[Edge], tryNum: Int = maxTryNum): Unit = {
-// if (tryNum <= 0) {
-// statFunc("errorStore", edges.size)
-// throw new RuntimeException(s"retry failed after $maxTryNum")
-// }
-// try {
-//
-// }
-// try {
-//
-// Graph.bulkMutates(edges)
-// // on bulk mode, we don`t actually care about WAL log on hbase. so skip this process to make things faster.
-// val puts = edges.flatMap(e => e.buildPutsAll ++ e.buildVertexPuts).map { p =>
-// // p.setDurability(Durability.SKIP_WAL)
-// p
-// }
-// table.put(puts)
-// statFunc("storeOk", msgs.size)
-// } catch {
-// case e: Throwable =>
-// e.printStackTrace()
-// Thread.sleep(sleepPeriod)
-// storeRec(edges, tryNum - 1)
-// // statFunc("errorStore", msgs.size)
-// // throw e
-// } finally {
-// table.close()
-// }
-// }
-//
-// counts
-// }
+
}
-/**
- * do not use Graph.bulkMutates since it automatically read zkQuorum and hbase TableName from database.
- * batch insert should reference database only for parsing not getting connection and table!
- */
object GraphSubscriber extends SparkApp with WithKafka {
val sleepPeriod = 5000
-
+ val usages =
+ s"""
+ |/**
+ | * this job read edge format(TSV) from HDFS file system then bulk load edges into s2graph. assumes that newLabelName is already created by API.
+ | * params:
+ | * 1. hdfsPath: where is your data in hdfs. require full path with hdfs:// predix
+ | * 2. dbUrl: jdbc database connection string to specify database for meta.
+ | * 3. originalLabelName: label to copy with.
+ | * 4. newLabelName: label field will be replaced to this value.
+ | * 5. zkQuorum: target hbase zkQuorum where this job will publish data to.
+ | * 6. hTableName: target hbase physical table name where this job will publish data to.
+ | * 7. batchSize: how many edges will be batched for Put request to target hbase.
+ | * 8. kafkaBrokerList: using kafka as fallback queue. when something goes wrong during batch, data needs to be replay will be stored in kafka.
+ | * 9. kafkaTopic: fallback queue topic.
+ | * after this job finished, s2graph will have data with sequence corresponding newLabelName.
+ | * change this newLabelName to ogirinalName if you want to online replace of label.
+ | *
+ | */
+ """.stripMargin
override def run() = {
/**
* Main function
*/
- if (args.length < 3) {
- System.err.println("Usage: GraphSubscriber <hdfsPath> <batchSize> <dbUrl> <zkQuorum> <tableName> <kafkaBrokerList> <kafkaTopic> <preSplitSize>")
+ println(args.toList)
+ if (args.length != 9) {
+ System.err.println(usages)
System.exit(1)
}
val hdfsPath = args(0)
- val batchSize = args(1).toInt
- val dbUrl = args(2)
- val zkQuorum = args(3)
- val tableName = args(4)
- val kafkaBrokerList = args(5)
- val kafkaTopic = args(6)
- val preSplitSize = if (args.length > 7) args(7).toInt else 20
+ val dbUrl = args(1)
+ val oldLabelName = args(2)
+ val newLabelName = args(3)
+ val zkQuorum = args(4)
+ val hTableName = args(5)
+ val batchSize = args(6).toInt
+ val kafkaBrokerList = args(7)
+ val kafkaTopic = args(8)
val conf = sparkConf(s"$hdfsPath: GraphSubscriber")
val sc = new SparkContext(conf)
-
val mapAcc = sc.accumulable(HashMap.empty[String, Long], "counter")(HashMapParam[String, Long](_ + _))
- val fallbackTopic = s"${tableName}_batch_failed"
+
+
try {
- tableName match {
- case "s2graph" | "s2graph-alpha" | "s2graph-sandbox" => System.err.println("try to create master table!")
- case _ => Management.createTable(zkQuorum, tableName, List("e", "v"), preSplitSize, None)
- }
- // not sure how fast htable get table can recognize new table so sleep.
- Thread.sleep(sleepPeriod)
-
+ import GraphSubscriberHelper._
// set local driver setting.
val phase = System.getProperty("phase")
GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
+ /** copy when oldLabel exist and newLabel done exist. otherwise ignore. */
+ Management.copyLabel(oldLabelName, newLabelName, toOption(hTableName))
val msgs = sc.textFile(hdfsPath)
msgs.foreachPartition(partition => {
@@ -290,21 +266,22 @@
partition.grouped(batchSize).foreach { msgs =>
try {
val start = System.currentTimeMillis()
+// val counts =
+// GraphSubscriberHelper.store(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
val counts =
-// if (isBulk) GraphSubscriberHelper.storeBulk(conn, msgs)(Some(mapAcc), zkQuorum, tableName)
- GraphSubscriberHelper.store(msgs)(Some(mapAcc))
+ GraphSubscriberHelper.storeBulk(conn, hTableName)(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
for ((k, v) <- counts) {
mapAcc += (k, v)
}
val duration = System.currentTimeMillis() - start
- println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $tableName")
+ println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName")
} catch {
case e: Throwable =>
println(s"[Failed]: store $e")
msgs.foreach { msg =>
- GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = fallbackTopic)
+ GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = kafkaTopic)
}
}
}
@@ -314,9 +291,8 @@
logInfo(s"counter: $mapAcc")
println(s"Stats: ${mapAcc}")
- // if (shouldUpdate) Label.prependHBaseTableName(labelName, tableName)
} catch {
- case e: IOException =>
+ case e: Throwable =>
println(s"job failed with exception: $e")
throw e
}
diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
index 13a3084..4e5b60a 100644
--- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
+++ b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
@@ -1,20 +1,44 @@
package subscriber
+import com.daumkakao.s2graph.core.{Label, Service, Management}
import org.scalatest.{ FunSuite, Matchers }
+import play.api.libs.json.{JsBoolean, JsNumber}
import s2.spark.WithKafka
-import kafka.javaapi.producer.Producer
-import config.Config
class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
-
- test("kafka producer") {
- val brokerList = Config.KAFKA_METADATA_BROKER_LIST
- println(brokerList)
- val producer = new Producer[String, String](kafkaConf(brokerList))
- println(producer)
- }
-
+ val phase = "dev"
+ val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
+ val zkQuorum = "localhost"
+ val kafkaBrokerList = "localhost:9099"
+ val currentTs = System.currentTimeMillis()
+ val op = "insertBulk"
+ val testLabelName = "s2graph_label_test"
+ val labelToReplace = "s2graph_label_test_new"
+ val serviceName = "s2graph"
+ val columnName = "user_id"
+ val columnType = "long"
+ val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
+ val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false))
+ val hTableName = "s2graph-dev_new"
+ val ttl = 86000
+ val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}")
+
+ GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
+
test("GraphSubscriberHelper.store") {
-
+ // actually we need to delete labelToReplace first for each test.
+ Management.copyLabel(testLabelName, labelToReplace, Some(hTableName))
+
+//
+// val msgs = (for {
+// i <- (1 until 10)
+// j <- (100 until 110)
+// } yield {
+// s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName"
+// }).toSeq
+ val msgs = testStrings
+
+ val stat = GraphSubscriberHelper.store(msgs, Some(labelToReplace))(None)
+ println(stat)
}
}
\ No newline at end of file
diff --git a/s2core/build.sbt b/s2core/build.sbt
index c27a05c..716fbdf 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -27,6 +27,7 @@
"commons-pool" % "commons-pool" % "1.6",
"org.scalikejdbc" %% "scalikejdbc" % "2.1.+",
"com.twitter" %% "util-collection" % "6.12.1",
- "org.hbase" % "asynchbase" % "1.7.0-SNAPSHOT"
+ "org.hbase" % "asynchbase" % "1.7.0-SNAPSHOT",
+ "org.scalatest" %% "scalatest" % "2.2.1" % "test"
)
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
index 6bb9081..dd73fd0 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
@@ -261,7 +261,9 @@
}
edgePuts
}
-
+ def insertBulk() = {
+ edgesWithInvertedIndex.buildPut :: edgesWithIndex.flatMap(e => e.buildPuts())
+ }
def insert() = {
edgesWithInvertedIndex.buildPutAsync :: edgesWithIndex.flatMap(e => e.buildPutsAsync)
}
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala
index 8a8240e..b528834 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala
@@ -765,11 +765,17 @@
*
*/
- def toGraphElement(s: String): Option[GraphElement] = {
+ def toGraphElement(s: String, labelNameToReplace: Option[String] = None): Option[GraphElement] = {
val parts = GraphUtil.split(s)
try {
val logType = parts(2)
val element = if (logType == "edge" | logType == "e") {
+ /** current only edge is considered to be bulk loaded */
+ labelNameToReplace match {
+ case None =>
+ case Some(toReplace) =>
+ parts(5) = toReplace
+ }
toEdge(parts)
} else if (logType == "vertex" | logType == "v") {
toVertex(parts)
@@ -779,7 +785,6 @@
element
} catch {
case e: Throwable =>
- logger.error(s"toGraphElement: $s => $e", e)
None
}
}
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
index 8b1b085..4c81e35 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
@@ -3,6 +3,8 @@
import play.api.libs.json._
import HBaseElement.InnerVal
+import scala.util.parsing.combinator.JavaTokenParsers
+
trait JSONParser {
def innerValToJsValue(innerVal: InnerVal): JsValue = {
@@ -76,4 +78,76 @@
case _ => value.toString
}
}
+ case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser {
+
+ val metaProps = label.metaPropsInvMap ++ Map(LabelMeta.from.name -> LabelMeta.from, LabelMeta.to.name -> LabelMeta.to)
+
+ def where: Parser[Where] = rep(clause) ^^ (Where(_))
+
+ def clause: Parser[Clause] = (predicate | parens) * (
+ "and" ^^^ { (a: Clause, b: Clause) => And(a, b) } |
+ "or" ^^^ { (a: Clause, b: Clause) => Or(a, b) })
+
+ def parens: Parser[Clause] = "(" ~> clause <~ ")"
+
+ def boolean = ("true" ^^^ (true) | "false" ^^^ (false))
+
+ /** floating point is not supported yet **/
+ def predicate = (
+ (ident ~ "=" ~ ident | ident ~ "=" ~ decimalNumber | ident ~ "=" ~ stringLiteral) ^^ {
+ case f ~ "=" ~ s =>
+ metaProps.get(f) match {
+ case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+ case Some(metaProp) =>
+ Equal(metaProp.seq, toInnerVal(s, metaProp.dataType))
+ }
+ }
+ | (ident ~ "between" ~ ident ~ "and" ~ ident | ident ~ "between" ~ decimalNumber ~ "and" ~ decimalNumber
+ | ident ~ "between" ~ stringLiteral ~ "and" ~ stringLiteral) ^^ {
+ case f ~ "between" ~ minV ~ "and" ~ maxV =>
+ metaProps.get(f) match {
+ case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+ case Some(metaProp) =>
+ Between(metaProp.seq, toInnerVal(minV, metaProp.dataType), toInnerVal(maxV, metaProp.dataType))
+ }
+ }
+ | (ident ~ "in" ~ "(" ~ rep(ident | decimalNumber | stringLiteral | "true" | "false" | ",") ~ ")") ^^ {
+ case f ~ "in" ~ "(" ~ vals ~ ")" =>
+ metaProps.get(f) match {
+ case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+ case Some(metaProp) =>
+ val values = vals.filter(v => v != ",").map { v =>
+ toInnerVal(v, metaProp.dataType)
+ }
+ IN(metaProp.seq, values.toSet)
+ }
+ }
+ | (ident ~ "!=" ~ ident | ident ~ "!=" ~ decimalNumber | ident ~ "!=" ~ stringLiteral) ^^ {
+ case f ~ "!=" ~ s =>
+ metaProps.get(f) match {
+ case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+ case Some(metaProp) =>
+ Not(Equal(metaProp.seq, toInnerVal(s, metaProp.dataType)))
+ }
+ }
+ | (ident ~ "not in" ~ "(" ~ rep(ident | decimalNumber | stringLiteral | "true" | "false" | ",") ~ ")") ^^ {
+ case f ~ "not in" ~ "(" ~ vals ~ ")" =>
+ metaProps.get(f) match {
+ case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+ case Some(metaProp) =>
+ val values = vals.filter(v => v != ",").map { v =>
+ toInnerVal(v, metaProp.dataType)
+ }
+ Not(IN(metaProp.seq, values.toSet))
+ }
+ }
+ )
+
+ def parse(sql: String): Option[Where] = {
+ parseAll(where, sql) match {
+ case Success(r, q) => Some(r)
+ case x => println(x); None
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala
index 39f293d..ad326c6 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala
@@ -230,6 +230,8 @@
def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
// lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
+ lazy val srcService = Service.findById(srcServiceId)
+ lazy val tgtService = Service.findById(tgtServiceId)
lazy val service = Service.findById(serviceId)
/**
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala
index bc249ec..1f87a1a 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala
@@ -29,7 +29,24 @@
*
* talk_friend 10 -> story:friend 10 ->
*/
+ def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+ for {
+ old <- Label.findByName(oldLabelName)
+ } {
+ Label.findByName(newLabelName) match {
+ case None =>
+ val (indexProps, metaProps) = old.metaPropsInvMap.partition(nameMeta => nameMeta._2.usedInIndex)
+ createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
+ old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
+ old.isDirected, old.serviceName,
+ indexProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq,
+ metaProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq,
+ old.consistencyLevel, hTableName, old.hTableTTL)
+ case Some(_) =>
+ }
+ }
+ }
/**
* label
*/
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala
index 33d1a39..92489ee 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala
@@ -127,6 +127,11 @@
}
}
+case class Not(val self: Clause) extends Clause {
+ override def filter(edge: Edge): Boolean = {
+ !self.filter(edge)
+ }
+}
case class And(val left: Clause, val right: Clause) extends Clause {
override def filter(edge: Edge): Boolean = {
left.filter(edge) && right.filter(edge)
diff --git a/s2core/src/test/scala/com/daumkakao/s2graph/core/ManagementTest.scala b/s2core/src/test/scala/com/daumkakao/s2graph/core/ManagementTest.scala
new file mode 100644
index 0000000..26a33dc
--- /dev/null
+++ b/s2core/src/test/scala/com/daumkakao/s2graph/core/ManagementTest.scala
@@ -0,0 +1,29 @@
+package com.daumkakao.s2graph.core
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{Matchers, FunSuite}
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Created by shon on 5/18/15.
+ */
+class ManagementTest extends FunSuite with Matchers {
+ val zkQuorum = "tokyo062.kr2.iwilab.com"
+ val database = "jdbc:mysql://nuk151.kr2.iwilab.com:13306/graph_alpha"
+ val config = ConfigFactory.parseString(
+ s"""
+ | db.default.url="$database"
+ | cache.ttl.second=60
+ | cache.max.size=1000
+ | hbase.client.operation.timeout=1000
+ | hbase.zookeeper.quorum="$zkQuorum"
+ """.stripMargin)
+ val labelToCopy = "graph_test"
+ val newLabelName = "graph_test_tc"
+ val hTableName = "graph_test_tc"
+ Graph.apply(config)(ExecutionContext.Implicits.global)
+ test("test copy label") {
+ Management.copyLabel(labelToCopy, newLabelName, Some(hTableName))
+ }
+}
diff --git a/test/controllers/FutureHelpersSpec.scala b/test/controllers/FutureHelpersSpec.scala
index 227257f..03c49e0 100644
--- a/test/controllers/FutureHelpersSpec.scala
+++ b/test/controllers/FutureHelpersSpec.scala
@@ -1,4 +1,6 @@
-package contollers;
+package test.controllers
+
+
//package kgraph
//import scala.concurrent.Await
//import scala.concurrent._
diff --git a/test/controllers/GraphSpec.scala b/test/controllers/GraphSpec.scala
deleted file mode 100644
index 30d7b5b..0000000
--- a/test/controllers/GraphSpec.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-package controllers
-
-import com.daumkakao.s2graph.core.Management._
-import com.daumkakao.s2graph.core._
-import org.specs2.mutable.Specification
-import play.api.libs.json.Json
-import scala.Option.option2Iterable
-/**
- * test-only kgraph.GraphSpec
- */
-class GraphSpec extends Specification {
- sequential
-
- private val phase = "alpha"
- // Graph.tableName = tableName
- System.setProperty("phase", phase)
-// Config.phase = phase
- private val serviceName = "s2graph"
- private val labelName = "graph_test"
-
- val msgs = """
- ["1417079166891\tu\te\t156598730\t158524147\tgraph_test\t{\"is_blocked\":true}","1417079166000\td\te\t4079848\t54739565\tgraph_test","1417079166000\ti\te\t102735518\t49629673\tgraph_test","1417079166000\ti\te\t15120422\t160902634\tgraph_test","1417079166000\ti\te\t160251573\t7960026\tgraph_test","1417079167000\td\te\t816808\t47823955\tgraph_test","1417079166000\ti\te\t160251573\t43063061\tgraph_test","1417079167000\ti\te\t141391508\t151985238\tgraph_test","1417079167360\tu\te\t43237903\t49005293\tgraph_test\t{\"is_blocked\":true}","1417079167000\ti\te\t9788808\t167206573\tgraph_test","1417079167000\ti\te\t106040854\t166946583\tgraph_test","1417079166000\ti\te\t15120422\t35674614\tgraph_test","1417079167519\tu\te\t29460631\t28640554\tgraph_test\t{\"is_blocked\":false}","1417079166000\ti\te\t15120422\t6366975\tgraph_test","1417079167000\tu\te\t4610363\t146176454\tgraph_test\t{\"is_hidden\":true}","1417079167000\td\te\t4935659\t1430321\tgraph_test","1417079166000\ti\te\t15120422\t136919643\tgraph_test","1417079167000\td\te\t84547561\t17923231\tgraph_test","1417079167000\td\te\t108876864\t19695266\tgraph_test","1417079166000\td\te\t14644128\t11853672\tgraph_test","1417079167000\ti\te\t142522237\t167208738\tgraph_test"]"""
-
- val createService = """{"serviceName": "s2graph"}"""
-
- val createLabel = """
- {"label": "graph_test", "srcServiceName": "s2graph", "srcColumnName": "account_id", "srcColumnType": "long", "tgtServiceName": "s2graph", "tgtColumnName": "account_id", "tgtColumnType": "long", "indexProps": {"time": 0, "weight":0}}
- """
-
- // 0.
- AdminController.createServiceInner(Json.parse(createService))
- // 1.
- AdminController.deleteLabel(labelName)
- // 2.
- AdminController.createLabelInner(Json.parse(createLabel))
-
- val label = Label.findByName("graph_test").get
-
- val insertEdges = """
-[
- {"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":193829192},
- {"from":1,"to":102,"label":"graph_test","props":{"time":0, "weight":11},"timestamp":193829193},
- {"from":1,"to":103,"label":"graph_test","props":{"time":1, "weight":12},"timestamp":193829194},
- {"from":1,"to":104,"label":"graph_test","props":{"time":-2, "weight":1},"timestamp":193829195}
-]
- """
-// val insertResults = List(
-// Edge(Vertex(CompositeId(label.srcColumn.id.get, InnerVal.withLong(1), true)), Vertex(CompositeId(label.tgtColumn.id.get, InnerVal.withLong(103), true)), LabelWithDirection(label.id.get, 0), 0, 193829192, )
-// )
- val insertEdgesTsv = """["193829192\ti\te\t1\t101\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}","193829192\ti\te\t1\t102\tgraph_test\t{\"time\":1, \"weight\": 19, \"is_hidden\": false}","193829192\ti\te\t1\t103\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}"]"""
-
- val updateEdges = """
-[
- {"from":1,"to":102,"label":"graph_test","timestamp":193829199, "props": {"is_hidden":true}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829299, "props": {"weight":100}}
-]
- """
- val incrementEdges = """
-[
- {"from":1,"to":102,"label":"graph_test","timestamp":193829399, "props": {"weight":100}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829499, "props": {"weight":100}},
- {"from":1,"to":103,"label":"graph_test","timestamp":193829599, "props": {"weight":1000}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":-200}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":400}}
-]
- """
-
- val queryEdges = """
-{
- "srcVertices": [{"columnName": "account_id", "id":1}],
- "steps": [
- [{"label": "graph_test", "direction": "out", "limit": 100, "scoring":{"time": 0, "weight": 1}}]
- ]
-}
- """
- val query = toQuery(Json.parse(queryEdges))
-
- "RequestParsor" should {
- "Json parsed edge should match Graph.toEdge" in {
- val jsons = Json.parse(insertEdges)
- val jsonEdges = toEdges(jsons, "insert")
- val tsvs = GraphUtil.parseString(insertEdgesTsv)
- val tsvEdges = tsvs.map(tsv => Graph.toEdge(tsv)).flatten
- jsonEdges must containTheSameElementsAs(tsvEdges)
- }
- }
-// "Graph" should {
-// "test insert" in {
-// EdgeController.tryMutates(Json.parse(insertEdges), "insert")
-// val edgesWithRank = Graph.getEdgesSync(query)
-// edgesWithRank must have size(1)
-// val head = edgesWithRank.head.toArray
-// head must have size(4)
-// val (firstEdge, firstRAnk) = head(0)
-// firstEdge.tgtVertex.innerId must beEqualTo(InnerVal.withLong(103))
-// firstEdge.ts must beEqualTo (193829194)
-//
-// true
-// }
-// }
-}
\ No newline at end of file
diff --git a/test/controllers/IntegritySpec.scala b/test/controllers/IntegritySpec.scala
index f4f916b..9a7b8ae 100644
--- a/test/controllers/IntegritySpec.scala
+++ b/test/controllers/IntegritySpec.scala
@@ -1,8 +1,10 @@
-package controllers
+package test.controllers
+
import com.daumkakao.s2graph.core._
import com.daumkakao.s2graph.rest.config.Config
import com.daumkakao.s2graph.rest.actors._
import com.twitter.io.exp.VarSource.Ok
+import controllers.AdminController
import org.omg.CosNaming.NamingContextPackage.NotFound
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
@@ -450,9 +452,8 @@
def initialize = {
running(FakeApplication()) {
- GraphAggregatorActor.init()
- KafkaAggregatorActor.init()
- Graph(Config.conf)(ExecutionContext.Implicits.global)
+// KafkaAggregatorActor.init()
+ Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
// 1. createService
var result = AdminController.createServiceInner(Json.parse(createService))
@@ -479,8 +480,7 @@
// println(s">> Label deleted : $testLabelName, $result")
// 2. delete service ( currently NOT supported )
- GraphAggregatorActor.shutdown()
- KafkaAggregatorActor.shutdown()
+// KafkaAggregatorActor.shutdown()
}
}
diff --git a/test/controllers/QuerySpec.scala b/test/controllers/QuerySpec.scala
index 8bc4975..3f0a01a 100644
--- a/test/controllers/QuerySpec.scala
+++ b/test/controllers/QuerySpec.scala
@@ -1,15 +1,14 @@
-package controllers
+package test.controllers
-import com.daumkakao.s2graph.core.Management._
import com.daumkakao.s2graph.core._
import com.daumkakao.s2graph.rest.actors._
import com.daumkakao.s2graph.rest.config.Config
+import controllers.{AdminController, RequestParser}
import org.specs2.matcher.Matchers
import org.specs2.mutable.Specification
import play.api.libs.json.{ JsArray, JsObject, Json }
import play.api.test.Helpers._
import play.api.test._
-import com.wordnik.swagger.annotations.Api
import scala.Array.canBuildFrom
import scala.concurrent.ExecutionContext
@@ -53,7 +52,7 @@
// logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
status(acts) must equalTo(OK)
- contentType(acts) must beSome.which(_ == "text/plain")
+ contentType(acts) must beSome.which(_ == "application/json")
val jsRslt = contentAsJson(acts)
println("======")
println(jsRslt)
@@ -101,7 +100,7 @@
// logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
status(acts) must equalTo(OK)
- contentType(acts) must beSome.which(_ == "text/plain")
+ contentType(acts) must beSome.which(_ == "application/json")
val jsRslt = contentAsJson(acts)
println("======")
println(jsRslt)
@@ -143,7 +142,7 @@
// logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
status(acts) must equalTo(OK)
- contentType(acts) must beSome.which(_ == "text/plain")
+ contentType(acts) must beSome.which(_ == "application/json")
val jsRslt = contentAsJson(acts)
println("======")
println(jsRslt)
@@ -161,10 +160,11 @@
}
-abstract class QuerySpecificationBase extends Specification {
- protected val testServiceName = "s2graph_test"
+abstract class QuerySpecificationBase extends Specification with RequestParser {
+ protected val testServiceName = "s2graph"
protected val testLabelName = "s2graph_label_test"
protected val testColumnName = "user_id"
+ val asyncFlushInterval = 1500 // in mill
val createService =
s"""
@@ -248,28 +248,33 @@
def initialize = {
running(FakeApplication()) {
- GraphAggregatorActor.init()
- KafkaAggregatorActor.init()
- Graph(Config.conf)(ExecutionContext.Implicits.global)
+ Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
// 1. createService
var result = AdminController.createServiceInner(Json.parse(createService))
println(s">> Service created : $createService, $result")
// 2. createLabel
- result = AdminController.createLabelInner(Json.parse(createLabel))
- println(s">> Label created : $createLabel, $result")
-
+ try {
+ AdminController.createLabelInner(Json.parse(createLabel))
+ println(s">> Label created : $createLabel, $result")
+ } catch {
+ case e: Throwable =>
+ }
+
// 3. insert edges
val jsArrStr = s"[${edgesInsertInitial.mkString(",")}]"
println(s">> 1st step Inserts : $jsArrStr")
val inserts = toEdges(Json.parse(jsArrStr), "insert")
Graph.mutateEdges(inserts)
+ Thread.sleep(asyncFlushInterval)
+
println(s"<< 1st step Inserted : $jsArrStr")
val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
println(s">> 2nd step Inserts : $jsArrStr2nd")
val inserts2nd = toEdges(Json.parse(jsArrStr2nd), "insert")
Graph.mutateEdges(inserts2nd)
+ Thread.sleep(asyncFlushInterval)
println(s"<< 2nd step Inserted : $inserts2nd")
}
}
@@ -277,26 +282,26 @@
def cleanup = {
running(FakeApplication()) {
- Graph(Config.conf)(ExecutionContext.Implicits.global)
+ Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
// 1. delete edges
val jsArrStr = s"[${edgesInsertInitial.mkString(",")}]"
println(s"Deletes : $jsArrStr")
val deletes = toEdges(Json.parse(jsArrStr), "delete")
Graph.mutateEdges(deletes)
+ Thread.sleep(asyncFlushInterval)
val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
println(s">> 2nd step Deletes : $jsArrStr2nd")
val deletes2nd = toEdges(Json.parse(jsArrStr2nd), "delete")
Graph.mutateEdges(deletes2nd)
+ Thread.sleep(asyncFlushInterval)
println(s"<< 2nd step Deleted : $deletes2nd")
// 2. delete label ( currently NOT supported )
// var result = AdminController.deleteLabelInner(testLabelName)
// println(s">> Label deleted : $testLabelName, $result")
// 3. delete service ( currently NOT supported )
- GraphAggregatorActor.shutdown()
- KafkaAggregatorActor.shutdown()
}
}
diff --git a/test/controllers/RequestParserSpec.scala b/test/controllers/RequestParserSpec.scala
index 9e51414..acf0f82 100644
--- a/test/controllers/RequestParserSpec.scala
+++ b/test/controllers/RequestParserSpec.scala
@@ -1,4 +1,4 @@
-package controllers
+package test.controllers
import models._
import com.daumkakao.s2graph.core._
@@ -14,6 +14,7 @@
class RequestParserSpec extends Specification {
// dummy data for dummy edge
+ val testLabelName = "s2graph_label_test"
val ts = System.currentTimeMillis()
val srcId = CompositeId(0, InnerVal.withLong(1), isEdge = true, useHash = true)
val tgtId = CompositeId(0, InnerVal.withLong(2), isEdge = true, useHash = true)
@@ -30,10 +31,9 @@
val checkedOpt = for (label <- Label.findByName(labelName)) yield {
val labelMetas = LabelMeta.findAllByLabelId(label.id.get, useCache = false)
val metaMap = labelMetas.map { m => m.name -> m.seq } toMap
-
val whereOpt = WhereParser(label).parse(sql)
whereOpt must beSome
- play.api.Logger.error(whereOpt.toString)
+ play.api.Logger.debug(whereOpt.toString)
// val props = Json.obj("is_hidden" -> true, "is_blocked" -> false)
// val propsInner = Management.toProps(label, props).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
@@ -49,18 +49,20 @@
"RequestParser WhereParser" should {
"check where clause not nested" in {
running(FakeApplication()) {
- val labelOpt = Label.findByName("graph_test")
+ val labelOpt = Label.findByName(testLabelName)
labelOpt must beSome[Label]
val label = labelOpt.get
val labelWithDir = LabelWithDirection(label.id.get, 0)
+
val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc")
val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
play.api.Logger.debug(s"$edge")
- val f = validate("graph_test")(edge)_
+ val f = validate(testLabelName)(edge)_
f("is_hidden = false")(false)
+ f("is_hidden != false")(true)
f("is_hidden = true and is_blocked = true")(false)
f("is_hidden = true and is_blocked = false")(true)
f("time in (1, 2, 3) and is_blocked = true")(false)
@@ -68,15 +70,14 @@
f("time in (1, 2, 3) and is_blocked = false")(true)
f("time in (1, 2, 4) and is_blocked = false")(false)
f("time in (1, 2, 4) or is_blocked = false")(true)
+ f("time not in (1, 2, 4)")(true)
f("time in (1, 2, 3) and weight between 10 and 20 and is_blocked = false")(true)
f("time in (1, 2, 4) or weight between 10 and 20 or is_blocked = true")(true)
- f("name = abc")(true)
- f("name = xxx")(false)
}
}
"check where clause nested" in {
running(FakeApplication()) {
- val labelOpt = Label.findByName("graph_test")
+ val labelOpt = Label.findByName(testLabelName)
labelOpt must beSome[Label]
val label = labelOpt.get
val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -84,7 +85,7 @@
val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
- val f = validate("graph_test")(edge)_
+ val f = validate(testLabelName)(edge)_
f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = false")(false)
f("(time in (1, 2, 3) or is_blocked = true) or is_hidden = false")(true)
@@ -96,15 +97,15 @@
f("(time in (1, 2, 4) or weight between 1 and 9) or is_hidden = true")(true)
f("(time in (1, 2, 3) or weight between 1 and 10) and is_hidden = false")(false)
- f("(name in (a, abc, c) and weight between 1 and 10) or is_hidden = false")(true)
- f("name between a and b or is_hidden = false")(true)
- f("name = abc and is_hidden = true")(true)
+// f("(name in (a, abc, c) and weight between 1 and 10) or is_hidden = false")(true)
+// f("name between a and b or is_hidden = false")(true)
+// f("name = abc and is_hidden = true")(true)
}
}
"check where clause with from/to long" in {
running(FakeApplication()) {
- val labelOpt = Label.findByName("graph_test")
+ val labelOpt = Label.findByName(testLabelName)
labelOpt must beSome[Label]
val label = labelOpt.get
val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -112,7 +113,7 @@
val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
- val f = validate("graph_test")(edge)_
+ val f = validate(testLabelName)(edge)_
// f("from = abc")(false)
f("_from = 2 or _to = 2")(true)