merging https://github.com/daumkakao/s2graph/pull/14 into master
diff --git a/app/com/daumkakao/s2graph/rest/config/Config.scala b/app/com/daumkakao/s2graph/rest/config/Config.scala
index 3be7b28..4bef00a 100644
--- a/app/com/daumkakao/s2graph/rest/config/Config.scala
+++ b/app/com/daumkakao/s2graph/rest/config/Config.scala
@@ -1,7 +1,8 @@
 package com.daumkakao.s2graph.rest.config
 
 import java.util.concurrent.TimeUnit
-import com.codahale.metrics.{Metric, ConsoleReporter, MetricRegistry}
+import com.codahale.metrics.{Slf4jReporter, Metric, ConsoleReporter, MetricRegistry}
+import org.slf4j.LoggerFactory
 import play.api.{Play, Logger}
 
 object Config {
@@ -32,10 +33,15 @@
 
 trait Instrumented extends nl.grons.metrics.scala.InstrumentedBuilder  {
   val metricRegistry: MetricRegistry = Config.metricRegistry
-  val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+  val reporter = Slf4jReporter.forRegistry(metricRegistry)
+    .outputTo(LoggerFactory.getLogger(classOf[Instrumented]))
     .convertRatesTo(TimeUnit.SECONDS)
     .convertDurationsTo(TimeUnit.MILLISECONDS)
     .build()
+//  val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+//    .convertRatesTo(TimeUnit.SECONDS)
+//    .convertDurationsTo(TimeUnit.MILLISECONDS)
+//    .build()
 
   val stats = new collection.mutable.HashMap[String, Metric]
 
diff --git a/app/controllers/ApplicationController.scala b/app/controllers/ApplicationController.scala
index 8384fd9..ab08044 100644
--- a/app/controllers/ApplicationController.scala
+++ b/app/controllers/ApplicationController.scala
@@ -17,7 +17,7 @@
   var connectionOption = CONNECTION -> "Keep-Alive"
   var keepAliveOption = "Keep-Alive" -> "timeout=5, max=100"
 
-  consoleReporter.start(10, TimeUnit.SECONDS)
+  reporter.start(10, TimeUnit.SECONDS)
 
   def updateHealthCheck(isHealthy: Boolean) = Action { request =>
     this.isHealthy = isHealthy
diff --git a/app/controllers/RequestParser.scala b/app/controllers/RequestParser.scala
index 945368b..c5af045 100644
--- a/app/controllers/RequestParser.scala
+++ b/app/controllers/RequestParser.scala
@@ -84,59 +84,6 @@
       WhereParser(label).parse(where)
     }
   }
-  case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser {
-
-    val metaProps = label.metaPropsInvMap ++ Map(LabelMeta.from.name -> LabelMeta.from, LabelMeta.to.name -> LabelMeta.to)
-
-    def where: Parser[Where] = rep(clause) ^^ (Where(_))
-
-    def clause: Parser[Clause] = (predicate | parens) * (
-      "and" ^^^ { (a: Clause, b: Clause) => And(a, b) } |
-      "or" ^^^ { (a: Clause, b: Clause) => Or(a, b) })
-
-    def parens: Parser[Clause] = "(" ~> clause <~ ")"
-
-    def boolean = ("true" ^^^ (true) | "false" ^^^ (false))
-
-    /** floating point is not supported yet **/
-    def predicate = (
-      (ident ~ "=" ~ ident | ident ~ "=" ~ decimalNumber | ident ~ "=" ~ stringLiteral) ^^ {
-        case f ~ "=" ~ s =>
-          metaProps.get(f) match {
-            case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
-            case Some(metaProp) =>
-              Equal(metaProp.seq, toInnerVal(s, metaProp.dataType))
-          }
-
-      }
-      | (ident ~ "between" ~ ident ~ "and" ~ ident | ident ~ "between" ~ decimalNumber ~ "and" ~ decimalNumber
-        | ident ~ "between" ~ stringLiteral ~ "and" ~ stringLiteral) ^^ {
-          case f ~ "between" ~ minV ~ "and" ~ maxV =>
-            metaProps.get(f) match {
-              case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
-              case Some(metaProp) =>
-                Between(metaProp.seq, toInnerVal(minV, metaProp.dataType), toInnerVal(maxV, metaProp.dataType))
-            }
-        }
-        | (ident ~ "in" ~ "(" ~ rep(ident | decimalNumber | stringLiteral | "true" | "false" | ",") ~ ")") ^^ {
-          case f ~ "in" ~ "(" ~ vals ~ ")" =>
-            metaProps.get(f) match {
-              case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
-              case Some(metaProp) =>
-                val values = vals.filter(v => v != ",").map { v =>
-                  toInnerVal(v, metaProp.dataType)
-                }
-                IN(metaProp.seq, values.toSet)
-            }
-        })
-
-    def parse(sql: String): Option[Where] = {
-      parseAll(where, sql) match {
-        case Success(r, q) => Some(r)
-        case x => println(x); None
-      }
-    }
-  }
 
   def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
     try {
diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala b/loader/src/main/scala/subscriber/GraphSubscriber.scala
index b1d6051..aec5c4f 100644
--- a/loader/src/main/scala/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/subscriber/GraphSubscriber.scala
@@ -4,11 +4,11 @@
 
 import com.daumkakao.s2graph.core._
 import com.daumkakao.s2graph.core.Graph
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
 import kafka.javaapi.producer.Producer
 import kafka.producer.KeyedMessage
 import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.client.{HConnection, HConnectionManager}
+import org.apache.hadoop.hbase.client.{HTableInterface, HConnection, HConnectionManager}
 import org.apache.spark.{Accumulable, SparkContext}
 import s2.spark.{HashMapParam, SparkApp, WithKafka}
 import subscriber.parser.{GraphParser, GraphParsers}
@@ -26,67 +26,39 @@
     zkQuorum = zkAddr.getOrElse("localhost")
     kafkaBrokers = kafkaBrokerList.getOrElse("localhost:9092")
     val s = s"""
-db.default.driver=com.mysql.jdbc.Driver
-db.default.url="$database"
-db.default.user=graph
-db.default.password=graph
+               |logger.root=ERROR
+               |
+               |# Logger used by the framework:
+               |logger.play=INFO
+               |
+               |# Logger provided to your application:
+               |logger.application=DEBUG
+               |
+               |# APP PHASE
+               |phase=dev
+               |
+               |# DB
+               |db.default.driver=com.mysql.jdbc.Driver
+               |db.default.url="$database"
+               |db.default.user=graph
+               |db.default.password=graph
+               |
+               |# Query server
+               |is.query.server=true
+               |is.write.server=true
+               |
+               |# Local Cache
+               |cache.ttl.seconds=60
+               |cache.max.size=100000
+               |
+               |# HBASE
+               |hbase.client.operation.timeout=60000
+               |
+               |# Kafka
+               |kafka.metadata.broker.list="$kafkaBrokers"
+               |kafka.producer.pool.size=0
+               |    """.stripMargin
 
-is.query.server=true
-is.analyzer=false
-is.test.query.server=false
-test.sample.prob=0.1
-    
-cache.ttl.seconds=60000
-cache.max.size=100000
-
-hbase.connection.pool.size=1
-hbase.table.pool.size=10
-hbase.client.ipc.pool.size=1
-zookeeper.recovery.retry=10
-zookeeper.session.timeout=180000
-hbase.zookeeper.quorum="$zkQuorum"
-hbase.table.name="s2graph-alpha"
-hbase.client.operation.timeout=10000
-hbase.client.retries.number=10
-hbase.client.write.operation.timeout=10000
-hbase.client.write.retries.number=10
-    
-kafka.metadata.broker.list="$kafkaBrokers"
-kafka.request.required.acks=1
-kafka.producer.type="sync"
-kafka.producer.buffer.flush.time=1000
-kafka.producer.buffer.size=1000
-kafka.producer.pool.size=1
-kafka.aggregate.flush.timeout=1000
-    
-# Aggregator
-client.aggregate.buffer.size=100
-client.aggregate.buffer.flush.time=10000
-client.aggregate.pool.size=1
-
-
-# blocking execution context
-contexts {
-	query {
-		fork-join-executor {
-  		parallelism-min = 1
-    	parallelism-max = 1
-		}
-	}
-	blocking {
-		fork-join-executor {
-  		parallelism-min = 1
-    	parallelism-max = 1
-		}
-	}
-	scheduler {
-	 	fork-join-executor {
-  		parallelism-min = 1
-    	parallelism-max = 1
-		}
-	}
-}
-  """
     println(s)
     ConfigFactory.parseString(s)
   }
@@ -98,17 +70,11 @@
 
 
   lazy val producer = new Producer[String, String](kafkaConf(GraphConfig.kafkaBrokers))
-
+  var config: Config = _
   private val writeBufferSize = 1024 * 1024 * 8
   private val sleepPeriod = 10000
   private val maxTryNum = 10
-//  lazy val graph = {
-//    println(System.getProperty("phase"))
-//    Graph.apply(GraphConfig.apply(System.getProperty("phase"), None, None))(ExecutionContext.Implicits.global)
-//    println(Graph.config)
-//    println(Graph.hbaseConfig)
-//    Graph
-//  }
+
   def toOption(s: String) = {
     s match {
       case "" | "none" => None
@@ -116,16 +82,14 @@
     }
   }
   def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: String) : Unit = {
-    apply(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList))
+    config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList))
+    Graph.apply(config)(ExecutionContext.Implicits.global)
   }
-  def apply(phase: String, dbUrl: Option[String], zkQuorum: Option[String], kafkaBrokerList: Option[String]): Unit  = {
-    Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, kafkaBrokerList))(ExecutionContext.Implicits.global)
-  }
+//  def apply(phase: String, dbUrl: Option[String], zkQuorum: Option[String], kafkaBrokerList: Option[String]): Unit  = {
+//    Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, kafkaBrokerList))(ExecutionContext.Implicits.global)
+//  }
   def report(key: String, value: Option[String], topic: String = "report") = {
-    //    val ts = System.currentTimeMillis().toString
     val msg = Seq(Some(key), value).flatten.mkString("\t")
-
-    //    val producer = new Producer[String, String](kafkaConf(Config.KAFKA_METADATA_BROKER_LIST))
     val kafkaMsg = new KeyedMessage[String, String](topic, msg)
     producer.send(kafkaMsg)
   }
@@ -133,15 +97,15 @@
   /**
    * bulkMutates read connection and table info from database.
    */
-  def store(msgs: Seq[String])(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
-    //    assert(msgs.size >= maxSize)
+  def store(msgs: Seq[String], labelToReplace: Option[String] = None)(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
     val counts = HashMap[String, Long]()
     val statFunc = storeStat(counts)(mapAccOpt)_
 
     val elements = (for (msg <- msgs) yield {
       statFunc("total", 1)
 
-      val element = Graph.toGraphElement(msg)
+      val element = Graph.toGraphElement(msg, labelToReplace)
+      println(element)
       element match {
         case Some(e) =>
           statFunc("parseOk", 1)
@@ -167,6 +131,64 @@
     }
     counts
   }
+  def storeBulk(conn: HConnection, tableName: String)(msgs: Seq[String], labelToReplace: Option[String] = None)(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
+    val counts = HashMap[String, Long]()
+    val statFunc = storeStat(counts)(mapAccOpt) _
+    val edges = (for (msg <- msgs) yield {
+      statFunc("total", 1)
+      try {
+         Graph.toGraphElement(msg, labelToReplace) match {
+            case Some(e) if e.isInstanceOf[Edge] =>
+              statFunc("parseOk", 1)
+              Some(e.asInstanceOf[Edge])
+            case None =>
+              statFunc("errorParsing", 1)
+              None
+         }
+      } catch {
+        case e: Throwable =>
+          System.err.println(s"$msg $e", e)
+          None
+      }
+    }).flatten.toList
+
+    println(edges.take(10))
+
+    /**
+     * don't use database for connection and table.
+     */
+    // throw all exception to caller.
+
+    def storeRec(edges: List[Edge], tryNum: Int = maxTryNum): Unit = {
+      if (tryNum <= 0) {
+        statFunc("errorStore", edges.size)
+        throw new RuntimeException(s"retry failed after $maxTryNum")
+      }
+      val table = conn.getTable(tableName)
+      table.setAutoFlush(false, false)
+      table.setWriteBufferSize(writeBufferSize)
+      try {
+        // we only care about insertBulk
+        val puts = edges.flatMap { edge =>
+          edge.relatedEdges.flatMap { relatedEdge =>
+            relatedEdge.insertBulk() ++ relatedEdge.buildVertexPuts()
+          }
+        }
+        println(puts.take(10))
+        table.put(puts)
+        statFunc("storeOk", msgs.size)
+      } catch {
+        case e: Throwable =>
+          e.printStackTrace()
+          Thread.sleep(sleepPeriod)
+          storeRec(edges, tryNum - 1)
+      } finally {
+        table.close()
+      }
+    }
+    storeRec(edges)
+    counts
+  }
   def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = {
     counts.put(key, counts.getOrElse(key, 0L) + value)
     mapAccOpt match {
@@ -174,108 +196,62 @@
       case Some(mapAcc) => mapAcc += (key -> value)
     }
   }
-//
-//  /**
-//   * caller of this method should split msgs into reasonable size.
-//   */
-//  def storeBulk(conn: HConnection, msgs: Seq[String])(mapAccOpt: Option[HashMapAccumulable], zkQuorum: String, tableName: String): Iterable[(String, Long)] = {
-//    val counts = HashMap[String, Long]()
-//    val statFunc = storeStat(counts)(mapAccOpt)_
-//    val edges = (for (msg <- msgs) yield {
-//      statFunc("total", 1)
-//
-//      val edge = Graph.toEdge(msg)
-//      edge match {
-//        case Some(e) =>
-//          statFunc("parseOk", 1)
-//          edge
-//        case None =>
-//          statFunc("errorParsing", 1)
-//          None
-//      }
-//    }).flatten.toList
-//
-//    storeRec(edges)
-//    /**
-//     * don't use database for connection and table.
-//     */
-//    // throw all exception to caller.
-//
-//    def storeRec(edges: List[Edge], tryNum: Int = maxTryNum): Unit = {
-//      if (tryNum <= 0) {
-//        statFunc("errorStore", edges.size)
-//        throw new RuntimeException(s"retry failed after $maxTryNum")
-//      }
-//      try {
-//
-//      }
-//      try {
-//
-//        Graph.bulkMutates(edges)
-//        // on bulk mode, we don`t actually care about WAL log on hbase. so skip this process to make things faster.
-//        val puts = edges.flatMap(e => e.buildPutsAll ++ e.buildVertexPuts).map { p =>
-//          //        p.setDurability(Durability.SKIP_WAL)
-//          p
-//        }
-//        table.put(puts)
-//        statFunc("storeOk", msgs.size)
-//      } catch {
-//        case e: Throwable =>
-//          e.printStackTrace()
-//          Thread.sleep(sleepPeriod)
-//          storeRec(edges, tryNum - 1)
-//        //          statFunc("errorStore", msgs.size)
-//        //          throw e
-//      } finally {
-//        table.close()
-//      }
-//    }
-//
-//    counts
-//  }
+
 }
-/**
- * do not use Graph.bulkMutates since it automatically read zkQuorum and hbase TableName from database.
- * batch insert should reference database only for parsing not getting connection and table!
- */
 object GraphSubscriber extends SparkApp with WithKafka {
   val sleepPeriod = 5000
-
+  val usages =
+    s"""
+       |/**
+       | * this job read edge format(TSV) from HDFS file system then bulk load edges into s2graph. assumes that newLabelName is already created by API.
+       | * params:
+       | *  1. hdfsPath: where is your data in hdfs. require full path with hdfs:// predix
+       | *  2. dbUrl: jdbc database connection string to specify database for meta.
+       | *  3. originalLabelName: label to copy with.
+       | *  4. newLabelName: label field will be replaced to this value.
+       | *  5. zkQuorum: target hbase zkQuorum where this job will publish data to.
+       | *  6. hTableName: target hbase physical table name where this job will publish data to.
+       | *  7. batchSize: how many edges will be batched for Put request to target hbase.
+       | *  8. kafkaBrokerList: using kafka as fallback queue. when something goes wrong during batch, data needs to be replay will be stored in kafka.
+       | *  9. kafkaTopic: fallback queue topic.
+       | * after this job finished, s2graph will have data with sequence corresponding newLabelName.
+       | * change this newLabelName to ogirinalName if you want to online replace of label.
+       | *
+       | */
+     """.stripMargin
   override def run() = {
     /**
      * Main function
      */
-    if (args.length < 3) {
-      System.err.println("Usage: GraphSubscriber <hdfsPath> <batchSize> <dbUrl> <zkQuorum> <tableName> <kafkaBrokerList> <kafkaTopic> <preSplitSize>")
+    println(args.toList)
+    if (args.length != 9) {
+      System.err.println(usages)
       System.exit(1)
     }
     val hdfsPath = args(0)
-    val batchSize = args(1).toInt
-    val dbUrl = args(2)
-    val zkQuorum = args(3)
-    val tableName = args(4)
-    val kafkaBrokerList = args(5)
-    val kafkaTopic = args(6)
-    val preSplitSize = if (args.length > 7) args(7).toInt else 20
+    val dbUrl = args(1)
+    val oldLabelName = args(2)
+    val newLabelName = args(3)
+    val zkQuorum = args(4)
+    val hTableName = args(5)
+    val batchSize = args(6).toInt
+    val kafkaBrokerList = args(7)
+    val kafkaTopic = args(8)
 
     val conf = sparkConf(s"$hdfsPath: GraphSubscriber")
     val sc = new SparkContext(conf)
-
     val mapAcc = sc.accumulable(HashMap.empty[String, Long], "counter")(HashMapParam[String, Long](_ + _))
-    val fallbackTopic = s"${tableName}_batch_failed"
+
+
 
     try {
-      tableName match {
-        case "s2graph" | "s2graph-alpha" | "s2graph-sandbox" => System.err.println("try to create master table!")
-        case _ => Management.createTable(zkQuorum, tableName, List("e", "v"), preSplitSize, None)
-      }
 
-      // not sure how fast htable get table can recognize new table so sleep.
-      Thread.sleep(sleepPeriod)
-
+      import GraphSubscriberHelper._
       // set local driver setting.
       val phase = System.getProperty("phase")
       GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
+      /** copy when oldLabel exist and newLabel done exist. otherwise ignore. */
+      Management.copyLabel(oldLabelName, newLabelName, toOption(hTableName))
 
       val msgs = sc.textFile(hdfsPath)
       msgs.foreachPartition(partition => {
@@ -290,21 +266,22 @@
         partition.grouped(batchSize).foreach { msgs =>
           try {
             val start = System.currentTimeMillis()
+//            val counts =
+//              GraphSubscriberHelper.store(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
             val counts =
-//              if (isBulk) GraphSubscriberHelper.storeBulk(conn, msgs)(Some(mapAcc), zkQuorum, tableName)
-              GraphSubscriberHelper.store(msgs)(Some(mapAcc))
+                GraphSubscriberHelper.storeBulk(conn, hTableName)(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
 
             for ((k, v) <- counts) {
               mapAcc += (k, v)
             }
             val duration = System.currentTimeMillis() - start
-            println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $tableName")
+            println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName")
           } catch {
             case e: Throwable =>
               println(s"[Failed]: store $e")
 
               msgs.foreach { msg =>
-                GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = fallbackTopic)
+                GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = kafkaTopic)
               }
           }
         }
@@ -314,9 +291,8 @@
       logInfo(s"counter: $mapAcc")
       println(s"Stats: ${mapAcc}")
 
-      //      if (shouldUpdate) Label.prependHBaseTableName(labelName, tableName)
     } catch {
-      case e: IOException =>
+      case e: Throwable =>
         println(s"job failed with exception: $e")
         throw e
     }
diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
index 13a3084..4e5b60a 100644
--- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
+++ b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
@@ -1,20 +1,44 @@
 package subscriber
 
+import com.daumkakao.s2graph.core.{Label, Service, Management}
 import org.scalatest.{ FunSuite, Matchers }
+import play.api.libs.json.{JsBoolean, JsNumber}
 import s2.spark.WithKafka
-import kafka.javaapi.producer.Producer
-import config.Config
 
 class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
- 
-  test("kafka producer") { 
-    val brokerList = Config.KAFKA_METADATA_BROKER_LIST
-    println(brokerList)
-    val producer = new Producer[String, String](kafkaConf(brokerList))
-    println(producer)
-  }
-  
+  val phase = "dev"
+  val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
+  val zkQuorum = "localhost"
+  val kafkaBrokerList = "localhost:9099"
+  val currentTs = System.currentTimeMillis()
+  val op = "insertBulk"
+  val testLabelName = "s2graph_label_test"
+  val labelToReplace = "s2graph_label_test_new"
+  val serviceName = "s2graph"
+  val columnName = "user_id"
+  val columnType = "long"
+  val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
+  val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false))
+  val hTableName = "s2graph-dev_new"
+  val ttl = 86000
+  val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}")
+
+  GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
+
   test("GraphSubscriberHelper.store") {
-    
+    // actually we need to delete labelToReplace first for each test.
+    Management.copyLabel(testLabelName, labelToReplace, Some(hTableName))
+
+//
+//    val msgs = (for {
+//      i <- (1 until 10)
+//      j <- (100 until 110)
+//    } yield {
+//        s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName"
+//      }).toSeq
+    val msgs = testStrings
+
+    val stat = GraphSubscriberHelper.store(msgs, Some(labelToReplace))(None)
+    println(stat)
   }
 }
\ No newline at end of file
diff --git a/s2core/build.sbt b/s2core/build.sbt
index c27a05c..716fbdf 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -27,6 +27,7 @@
   "commons-pool" % "commons-pool" % "1.6",
   "org.scalikejdbc" %% "scalikejdbc"        % "2.1.+",
   "com.twitter" %% "util-collection" % "6.12.1",
-  "org.hbase" % "asynchbase" % "1.7.0-SNAPSHOT"
+  "org.hbase" % "asynchbase" % "1.7.0-SNAPSHOT",
+  "org.scalatest" %% "scalatest" % "2.2.1" % "test"
    )
    
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
index 6bb9081..dd73fd0 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
@@ -261,7 +261,9 @@
       }
     edgePuts
   }
-
+  def insertBulk() = {
+    edgesWithInvertedIndex.buildPut :: edgesWithIndex.flatMap(e => e.buildPuts())
+  }
   def insert() = {
     edgesWithInvertedIndex.buildPutAsync :: edgesWithIndex.flatMap(e => e.buildPutsAsync)
   }
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala
index 8a8240e..b528834 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Graph.scala
@@ -765,11 +765,17 @@
    *
    */
 
-  def toGraphElement(s: String): Option[GraphElement] = {
+  def toGraphElement(s: String, labelNameToReplace: Option[String] = None): Option[GraphElement] = {
     val parts = GraphUtil.split(s)
     try {
       val logType = parts(2)
       val element = if (logType == "edge" | logType == "e") {
+        /** current only edge is considered to be bulk loaded */
+        labelNameToReplace match {
+          case None =>
+          case Some(toReplace) =>
+            parts(5) = toReplace
+        }
         toEdge(parts)
       } else if (logType == "vertex" | logType == "v") {
         toVertex(parts)
@@ -779,7 +785,6 @@
       element
     } catch {
       case e: Throwable =>
-        logger.error(s"toGraphElement: $s => $e", e)
         None
     }
   }
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
index 8b1b085..4c81e35 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
@@ -3,6 +3,8 @@
 import play.api.libs.json._
 import HBaseElement.InnerVal
 
+import scala.util.parsing.combinator.JavaTokenParsers
+
 trait JSONParser {
 
   def innerValToJsValue(innerVal: InnerVal): JsValue = {
@@ -76,4 +78,76 @@
       case _ => value.toString
     }
   }
+  case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser {
+
+    val metaProps = label.metaPropsInvMap ++ Map(LabelMeta.from.name -> LabelMeta.from, LabelMeta.to.name -> LabelMeta.to)
+
+    def where: Parser[Where] = rep(clause) ^^ (Where(_))
+
+    def clause: Parser[Clause] = (predicate | parens) * (
+      "and" ^^^ { (a: Clause, b: Clause) => And(a, b) } |
+        "or" ^^^ { (a: Clause, b: Clause) => Or(a, b) })
+
+    def parens: Parser[Clause] = "(" ~> clause <~ ")"
+
+    def boolean = ("true" ^^^ (true) | "false" ^^^ (false))
+
+    /** floating point is not supported yet **/
+    def predicate = (
+      (ident ~ "=" ~ ident | ident ~ "=" ~ decimalNumber | ident ~ "=" ~ stringLiteral) ^^ {
+        case f ~ "=" ~ s =>
+          metaProps.get(f) match {
+            case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+            case Some(metaProp) =>
+              Equal(metaProp.seq, toInnerVal(s, metaProp.dataType))
+          }
+      }
+        | (ident ~ "between" ~ ident ~ "and" ~ ident | ident ~ "between" ~ decimalNumber ~ "and" ~ decimalNumber
+        | ident ~ "between" ~ stringLiteral ~ "and" ~ stringLiteral) ^^ {
+        case f ~ "between" ~ minV ~ "and" ~ maxV =>
+          metaProps.get(f) match {
+            case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+            case Some(metaProp) =>
+              Between(metaProp.seq, toInnerVal(minV, metaProp.dataType), toInnerVal(maxV, metaProp.dataType))
+          }
+      }
+        | (ident ~ "in" ~ "(" ~ rep(ident | decimalNumber | stringLiteral | "true" | "false" | ",") ~ ")") ^^ {
+        case f ~ "in" ~ "(" ~ vals ~ ")" =>
+          metaProps.get(f) match {
+            case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+            case Some(metaProp) =>
+              val values = vals.filter(v => v != ",").map { v =>
+                toInnerVal(v, metaProp.dataType)
+              }
+              IN(metaProp.seq, values.toSet)
+          }
+      }
+        | (ident ~ "!=" ~ ident | ident ~ "!=" ~ decimalNumber | ident ~ "!=" ~ stringLiteral) ^^ {
+        case f ~ "!=" ~ s =>
+          metaProps.get(f) match {
+            case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+            case Some(metaProp) =>
+              Not(Equal(metaProp.seq, toInnerVal(s, metaProp.dataType)))
+          }
+      }
+        | (ident ~ "not in" ~ "(" ~ rep(ident | decimalNumber | stringLiteral | "true" | "false" | ",") ~ ")") ^^ {
+        case f ~ "not in" ~ "(" ~ vals ~ ")" =>
+          metaProps.get(f) match {
+            case None => throw new RuntimeException(s"where clause contains not existing property name: $f")
+            case Some(metaProp) =>
+              val values = vals.filter(v => v != ",").map { v =>
+                toInnerVal(v, metaProp.dataType)
+              }
+              Not(IN(metaProp.seq, values.toSet))
+          }
+      }
+      )
+
+    def parse(sql: String): Option[Where] = {
+      parseAll(where, sql) match {
+        case Success(r, q) => Some(r)
+        case x => println(x); None
+      }
+    }
+  }
 }
\ No newline at end of file
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala
index 39f293d..ad326c6 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Labels.scala
@@ -230,6 +230,8 @@
   def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
 
   //  lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
+  lazy val srcService = Service.findById(srcServiceId)
+  lazy val tgtService = Service.findById(tgtServiceId)
   lazy val service = Service.findById(serviceId)
 
   /**
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala
index bc249ec..1f87a1a 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Management.scala
@@ -29,7 +29,24 @@
    *
    * talk_friend 10 -> story:friend 10 ->
    */
+  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+    for {
+      old <- Label.findByName(oldLabelName)
+    } {
+      Label.findByName(newLabelName) match {
+        case None =>
+          val (indexProps, metaProps) = old.metaPropsInvMap.partition(nameMeta => nameMeta._2.usedInIndex)
 
+          createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
+            old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
+            old.isDirected, old.serviceName,
+            indexProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq,
+            metaProps.map(t => t._1 -> innerValToJsValue(t._2.defaultInnerVal)).toSeq,
+            old.consistencyLevel, hTableName, old.hTableTTL)
+        case Some(_) =>
+      }
+    }
+  }
   /**
    * label
    */
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala
index 33d1a39..92489ee 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/QueryParam.scala
@@ -127,6 +127,11 @@
 
   }
 }
+case class Not(val self: Clause) extends Clause {
+  override def filter(edge: Edge): Boolean = {
+    !self.filter(edge)
+  }
+}
 case class And(val left: Clause, val right: Clause) extends Clause {
   override def filter(edge: Edge): Boolean = {
     left.filter(edge) && right.filter(edge)
diff --git a/s2core/src/test/scala/com/daumkakao/s2graph/core/ManagementTest.scala b/s2core/src/test/scala/com/daumkakao/s2graph/core/ManagementTest.scala
new file mode 100644
index 0000000..26a33dc
--- /dev/null
+++ b/s2core/src/test/scala/com/daumkakao/s2graph/core/ManagementTest.scala
@@ -0,0 +1,29 @@
+package com.daumkakao.s2graph.core
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{Matchers, FunSuite}
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Created by shon on 5/18/15.
+ */
+class ManagementTest extends FunSuite with Matchers {
+  val zkQuorum = "tokyo062.kr2.iwilab.com"
+  val database = "jdbc:mysql://nuk151.kr2.iwilab.com:13306/graph_alpha"
+  val config = ConfigFactory.parseString(
+    s"""
+       | db.default.url="$database"
+       | cache.ttl.second=60
+       | cache.max.size=1000
+       | hbase.client.operation.timeout=1000
+       | hbase.zookeeper.quorum="$zkQuorum"
+     """.stripMargin)
+  val labelToCopy = "graph_test"
+  val newLabelName = "graph_test_tc"
+  val hTableName = "graph_test_tc"
+  Graph.apply(config)(ExecutionContext.Implicits.global)
+  test("test copy label") {
+    Management.copyLabel(labelToCopy, newLabelName, Some(hTableName))
+  }
+}
diff --git a/test/controllers/FutureHelpersSpec.scala b/test/controllers/FutureHelpersSpec.scala
index 227257f..03c49e0 100644
--- a/test/controllers/FutureHelpersSpec.scala
+++ b/test/controllers/FutureHelpersSpec.scala
@@ -1,4 +1,6 @@
-package contollers;
+package test.controllers
+
+
 //package kgraph
 //import scala.concurrent.Await
 //import scala.concurrent._
diff --git a/test/controllers/GraphSpec.scala b/test/controllers/GraphSpec.scala
deleted file mode 100644
index 30d7b5b..0000000
--- a/test/controllers/GraphSpec.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-package controllers
-
-import com.daumkakao.s2graph.core.Management._
-import com.daumkakao.s2graph.core._
-import org.specs2.mutable.Specification
-import play.api.libs.json.Json
-import scala.Option.option2Iterable
-/**
- * test-only kgraph.GraphSpec
- */
-class GraphSpec extends Specification {
-  sequential
-
-  private val phase = "alpha"
-  //  Graph.tableName = tableName
-  System.setProperty("phase", phase)
-//  Config.phase = phase
-  private val serviceName = "s2graph"
-  private val labelName = "graph_test"
-    
-  val msgs = """
-    ["1417079166891\tu\te\t156598730\t158524147\tgraph_test\t{\"is_blocked\":true}","1417079166000\td\te\t4079848\t54739565\tgraph_test","1417079166000\ti\te\t102735518\t49629673\tgraph_test","1417079166000\ti\te\t15120422\t160902634\tgraph_test","1417079166000\ti\te\t160251573\t7960026\tgraph_test","1417079167000\td\te\t816808\t47823955\tgraph_test","1417079166000\ti\te\t160251573\t43063061\tgraph_test","1417079167000\ti\te\t141391508\t151985238\tgraph_test","1417079167360\tu\te\t43237903\t49005293\tgraph_test\t{\"is_blocked\":true}","1417079167000\ti\te\t9788808\t167206573\tgraph_test","1417079167000\ti\te\t106040854\t166946583\tgraph_test","1417079166000\ti\te\t15120422\t35674614\tgraph_test","1417079167519\tu\te\t29460631\t28640554\tgraph_test\t{\"is_blocked\":false}","1417079166000\ti\te\t15120422\t6366975\tgraph_test","1417079167000\tu\te\t4610363\t146176454\tgraph_test\t{\"is_hidden\":true}","1417079167000\td\te\t4935659\t1430321\tgraph_test","1417079166000\ti\te\t15120422\t136919643\tgraph_test","1417079167000\td\te\t84547561\t17923231\tgraph_test","1417079167000\td\te\t108876864\t19695266\tgraph_test","1417079166000\td\te\t14644128\t11853672\tgraph_test","1417079167000\ti\te\t142522237\t167208738\tgraph_test"]"""
-
-  val createService = """{"serviceName": "s2graph"}"""
-
-  val createLabel = """
-  {"label": "graph_test", "srcServiceName": "s2graph", "srcColumnName": "account_id", "srcColumnType": "long", "tgtServiceName": "s2graph", "tgtColumnName": "account_id", "tgtColumnType": "long", "indexProps": {"time": 0, "weight":0}}
-  """
-    
-  // 0.
-  AdminController.createServiceInner(Json.parse(createService))
-  // 1.   
-  AdminController.deleteLabel(labelName)
-  // 2. 
-  AdminController.createLabelInner(Json.parse(createLabel))
-  
-  val label = Label.findByName("graph_test").get
-  
-  val insertEdges = """
-[
-  {"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":193829192},
-  {"from":1,"to":102,"label":"graph_test","props":{"time":0, "weight":11},"timestamp":193829193},
-  {"from":1,"to":103,"label":"graph_test","props":{"time":1, "weight":12},"timestamp":193829194},
-  {"from":1,"to":104,"label":"graph_test","props":{"time":-2, "weight":1},"timestamp":193829195}
-]
-  """
-//  val insertResults = List(
-//  Edge(Vertex(CompositeId(label.srcColumn.id.get, InnerVal.withLong(1), true)), Vertex(CompositeId(label.tgtColumn.id.get, InnerVal.withLong(103), true)), LabelWithDirection(label.id.get, 0), 0, 193829192, )
-//  )
-  val insertEdgesTsv = """["193829192\ti\te\t1\t101\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}","193829192\ti\te\t1\t102\tgraph_test\t{\"time\":1, \"weight\": 19, \"is_hidden\": false}","193829192\ti\te\t1\t103\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}"]"""
-
-  val updateEdges = """
-[
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829199, "props": {"is_hidden":true}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829299, "props": {"weight":100}}
-]
-  """
-  val incrementEdges = """
-[
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829399, "props": {"weight":100}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829499, "props": {"weight":100}},
-  {"from":1,"to":103,"label":"graph_test","timestamp":193829599, "props": {"weight":1000}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":-200}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":400}} 
-]   
-  """
-
-  val queryEdges = """
-{
-    "srcVertices": [{"columnName": "account_id", "id":1}],
-    "steps": [
-      [{"label": "graph_test", "direction": "out", "limit": 100, "scoring":{"time": 0, "weight": 1}}]
-    ]
-}
-  """
-  val query = toQuery(Json.parse(queryEdges))
-
-  "RequestParsor" should {
-    "Json parsed edge should match Graph.toEdge" in {
-      val jsons = Json.parse(insertEdges)
-      val jsonEdges = toEdges(jsons, "insert")
-      val tsvs = GraphUtil.parseString(insertEdgesTsv)
-      val tsvEdges = tsvs.map(tsv => Graph.toEdge(tsv)).flatten
-      jsonEdges must containTheSameElementsAs(tsvEdges)
-    }
-  }
-//  "Graph" should {
-//    "test insert" in {
-//      EdgeController.tryMutates(Json.parse(insertEdges), "insert")
-//      val edgesWithRank = Graph.getEdgesSync(query)
-//      edgesWithRank must have size(1)
-//      val head = edgesWithRank.head.toArray
-//      head must have size(4)
-//      val (firstEdge, firstRAnk) = head(0)
-//      firstEdge.tgtVertex.innerId must beEqualTo(InnerVal.withLong(103)) 
-//      firstEdge.ts must beEqualTo (193829194)
-//      
-//      true
-//    }
-//  }
-}
\ No newline at end of file
diff --git a/test/controllers/IntegritySpec.scala b/test/controllers/IntegritySpec.scala
index f4f916b..9a7b8ae 100644
--- a/test/controllers/IntegritySpec.scala
+++ b/test/controllers/IntegritySpec.scala
@@ -1,8 +1,10 @@
-package controllers
+package test.controllers
+
 import com.daumkakao.s2graph.core._
 import com.daumkakao.s2graph.rest.config.Config
 import com.daumkakao.s2graph.rest.actors._
 import com.twitter.io.exp.VarSource.Ok
+import controllers.AdminController
 import org.omg.CosNaming.NamingContextPackage.NotFound
 import scala.concurrent.{ Await, Future }
 import scala.concurrent.duration._
@@ -450,9 +452,8 @@
   def initialize = {
     running(FakeApplication()) {
 
-      GraphAggregatorActor.init()
-      KafkaAggregatorActor.init()
-      Graph(Config.conf)(ExecutionContext.Implicits.global)
+//      KafkaAggregatorActor.init()
+      Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
 
       // 1. createService
       var result = AdminController.createServiceInner(Json.parse(createService))
@@ -479,8 +480,7 @@
       //      println(s">> Label deleted : $testLabelName, $result")
       // 2. delete service ( currently NOT supported )
 
-      GraphAggregatorActor.shutdown()
-      KafkaAggregatorActor.shutdown()
+//      KafkaAggregatorActor.shutdown()
     }
 
   }
diff --git a/test/controllers/QuerySpec.scala b/test/controllers/QuerySpec.scala
index 8bc4975..3f0a01a 100644
--- a/test/controllers/QuerySpec.scala
+++ b/test/controllers/QuerySpec.scala
@@ -1,15 +1,14 @@
-package controllers
+package test.controllers
 
-import com.daumkakao.s2graph.core.Management._
 import com.daumkakao.s2graph.core._
 import com.daumkakao.s2graph.rest.actors._
 import com.daumkakao.s2graph.rest.config.Config
+import controllers.{AdminController, RequestParser}
 import org.specs2.matcher.Matchers
 import org.specs2.mutable.Specification
 import play.api.libs.json.{ JsArray, JsObject, Json }
 import play.api.test.Helpers._
 import play.api.test._
-import com.wordnik.swagger.annotations.Api
 import scala.Array.canBuildFrom
 import scala.concurrent.ExecutionContext
 
@@ -53,7 +52,7 @@
 
           //    logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
           status(acts) must equalTo(OK)
-          contentType(acts) must beSome.which(_ == "text/plain")
+          contentType(acts) must beSome.which(_ == "application/json")
           val jsRslt = contentAsJson(acts)
           println("======")
           println(jsRslt)
@@ -101,7 +100,7 @@
 
           //    logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
           status(acts) must equalTo(OK)
-          contentType(acts) must beSome.which(_ == "text/plain")
+          contentType(acts) must beSome.which(_ == "application/json")
           val jsRslt = contentAsJson(acts)
           println("======")
           println(jsRslt)
@@ -143,7 +142,7 @@
 
           //    logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
           status(acts) must equalTo(OK)
-          contentType(acts) must beSome.which(_ == "text/plain")
+          contentType(acts) must beSome.which(_ == "application/json")
           val jsRslt = contentAsJson(acts)
           println("======")
           println(jsRslt)
@@ -161,10 +160,11 @@
 
 }
 
-abstract class QuerySpecificationBase extends Specification {
-  protected val testServiceName = "s2graph_test"
+abstract class QuerySpecificationBase extends Specification with RequestParser {
+  protected val testServiceName = "s2graph"
   protected val testLabelName = "s2graph_label_test"
   protected val testColumnName = "user_id"
+  val asyncFlushInterval = 1500 // in mill
 
   val createService =
     s"""
@@ -248,28 +248,33 @@
 
   def initialize = {
     running(FakeApplication()) {
-      GraphAggregatorActor.init()
-      KafkaAggregatorActor.init()
-      Graph(Config.conf)(ExecutionContext.Implicits.global)
+      Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
 
       // 1. createService
       var result = AdminController.createServiceInner(Json.parse(createService))
       println(s">> Service created : $createService, $result")
       // 2. createLabel
-      result = AdminController.createLabelInner(Json.parse(createLabel))
-      println(s">> Label created : $createLabel, $result")
-      
+      try {
+        AdminController.createLabelInner(Json.parse(createLabel))
+        println(s">> Label created : $createLabel, $result")
+      } catch {
+        case e: Throwable =>
+      }
+
       // 3. insert edges
       val jsArrStr = s"[${edgesInsertInitial.mkString(",")}]"
       println(s">> 1st step Inserts : $jsArrStr")
       val inserts = toEdges(Json.parse(jsArrStr), "insert")
       Graph.mutateEdges(inserts)
+      Thread.sleep(asyncFlushInterval)
+
       println(s"<< 1st step Inserted : $jsArrStr")
 
       val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
       println(s">> 2nd step Inserts : $jsArrStr2nd")
       val inserts2nd = toEdges(Json.parse(jsArrStr2nd), "insert")
       Graph.mutateEdges(inserts2nd)
+      Thread.sleep(asyncFlushInterval)
       println(s"<< 2nd step Inserted : $inserts2nd")
     }
   }
@@ -277,26 +282,26 @@
   def cleanup = {
     running(FakeApplication()) {
 
-      Graph(Config.conf)(ExecutionContext.Implicits.global)
+      Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
 
       // 1. delete edges
       val jsArrStr = s"[${edgesInsertInitial.mkString(",")}]"
       println(s"Deletes : $jsArrStr")
       val deletes = toEdges(Json.parse(jsArrStr), "delete")
       Graph.mutateEdges(deletes)
+      Thread.sleep(asyncFlushInterval)
 
       val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
       println(s">> 2nd step Deletes : $jsArrStr2nd")
       val deletes2nd = toEdges(Json.parse(jsArrStr2nd), "delete")
       Graph.mutateEdges(deletes2nd)
+      Thread.sleep(asyncFlushInterval)
       println(s"<< 2nd step Deleted : $deletes2nd")
 
       // 2. delete label ( currently NOT supported )
 //      var result = AdminController.deleteLabelInner(testLabelName)
 //      println(s">> Label deleted : $testLabelName, $result")
       // 3. delete service ( currently NOT supported )
-      GraphAggregatorActor.shutdown()
-      KafkaAggregatorActor.shutdown()
     }
 
   }
diff --git a/test/controllers/RequestParserSpec.scala b/test/controllers/RequestParserSpec.scala
index 9e51414..acf0f82 100644
--- a/test/controllers/RequestParserSpec.scala
+++ b/test/controllers/RequestParserSpec.scala
@@ -1,4 +1,4 @@
-package controllers
+package test.controllers
 
 import models._
 import com.daumkakao.s2graph.core._
@@ -14,6 +14,7 @@
 class RequestParserSpec extends Specification {
 
   // dummy data for dummy edge
+  val testLabelName = "s2graph_label_test"
   val ts = System.currentTimeMillis()
   val srcId = CompositeId(0, InnerVal.withLong(1), isEdge = true, useHash = true)
   val tgtId = CompositeId(0, InnerVal.withLong(2), isEdge = true, useHash = true)
@@ -30,10 +31,9 @@
     val checkedOpt = for (label <- Label.findByName(labelName)) yield {
       val labelMetas = LabelMeta.findAllByLabelId(label.id.get, useCache = false)
       val metaMap = labelMetas.map { m => m.name -> m.seq } toMap
-
       val whereOpt = WhereParser(label).parse(sql)
       whereOpt must beSome
-      play.api.Logger.error(whereOpt.toString)
+      play.api.Logger.debug(whereOpt.toString)
 
       //      val props = Json.obj("is_hidden" -> true, "is_blocked" -> false)
       //      val propsInner = Management.toProps(label, props).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
@@ -49,18 +49,20 @@
   "RequestParser WhereParser" should {
     "check where clause not nested" in {
       running(FakeApplication()) {
-        val labelOpt = Label.findByName("graph_test")
+        val labelOpt = Label.findByName(testLabelName)
         labelOpt must beSome[Label]
         val label = labelOpt.get
         val labelWithDir = LabelWithDirection(label.id.get, 0)
+
         val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc")
         val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
         val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
         play.api.Logger.debug(s"$edge")
         
-        val f = validate("graph_test")(edge)_
+        val f = validate(testLabelName)(edge)_
 
         f("is_hidden = false")(false)
+        f("is_hidden != false")(true)
         f("is_hidden = true and is_blocked = true")(false)
         f("is_hidden = true and is_blocked = false")(true)
         f("time in (1, 2, 3) and is_blocked = true")(false)
@@ -68,15 +70,14 @@
         f("time in (1, 2, 3) and is_blocked = false")(true)
         f("time in (1, 2, 4) and is_blocked = false")(false)
         f("time in (1, 2, 4) or is_blocked = false")(true)
+        f("time not in (1, 2, 4)")(true)
         f("time in (1, 2, 3) and weight between 10 and 20 and is_blocked = false")(true)
         f("time in (1, 2, 4) or weight between 10 and 20 or is_blocked = true")(true)
-        f("name = abc")(true)
-        f("name = xxx")(false)
       }
     }
     "check where clause nested" in {
       running(FakeApplication()) {
-        val labelOpt = Label.findByName("graph_test")
+        val labelOpt = Label.findByName(testLabelName)
         labelOpt must beSome[Label]
         val label = labelOpt.get
         val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -84,7 +85,7 @@
         val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
         val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
 
-        val f = validate("graph_test")(edge)_
+        val f = validate(testLabelName)(edge)_
 
         f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = false")(false)
         f("(time in (1, 2, 3) or is_blocked = true) or is_hidden = false")(true)
@@ -96,15 +97,15 @@
         f("(time in (1, 2, 4) or weight between 1 and 9) or is_hidden = true")(true)
         f("(time in (1, 2, 3) or weight between 1 and 10) and is_hidden = false")(false)
 
-        f("(name in (a, abc, c) and weight between 1 and 10) or is_hidden = false")(true)
-        f("name between a and b or is_hidden = false")(true)
-        f("name = abc and is_hidden = true")(true)
+//        f("(name in (a, abc, c) and weight between 1 and 10) or is_hidden = false")(true)
+//        f("name between a and b or is_hidden = false")(true)
+//        f("name = abc and is_hidden = true")(true)
 
       }
     }
     "check where clause with from/to long" in {
       running(FakeApplication()) {
-        val labelOpt = Label.findByName("graph_test")
+        val labelOpt = Label.findByName(testLabelName)
         labelOpt must beSome[Label]
         val label = labelOpt.get
         val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -112,7 +113,7 @@
         val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
         val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
 
-        val f = validate("graph_test")(edge)_
+        val f = validate(testLabelName)(edge)_
 
 //        f("from = abc")(false)
         f("_from = 2 or _to = 2")(true)