fix test errors. https://github.com/daumkakao/s2graph/issues/10
diff --git a/app/com/daumkakao/s2graph/rest/config/Config.scala b/app/com/daumkakao/s2graph/rest/config/Config.scala
index 3be7b28..4bef00a 100644
--- a/app/com/daumkakao/s2graph/rest/config/Config.scala
+++ b/app/com/daumkakao/s2graph/rest/config/Config.scala
@@ -1,7 +1,8 @@
 package com.daumkakao.s2graph.rest.config
 
 import java.util.concurrent.TimeUnit
-import com.codahale.metrics.{Metric, ConsoleReporter, MetricRegistry}
+import com.codahale.metrics.{Slf4jReporter, Metric, ConsoleReporter, MetricRegistry}
+import org.slf4j.LoggerFactory
 import play.api.{Play, Logger}
 
 object Config {
@@ -32,10 +33,15 @@
 
 trait Instrumented extends nl.grons.metrics.scala.InstrumentedBuilder  {
   val metricRegistry: MetricRegistry = Config.metricRegistry
-  val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+  val reporter = Slf4jReporter.forRegistry(metricRegistry)
+    .outputTo(LoggerFactory.getLogger(classOf[Instrumented]))
     .convertRatesTo(TimeUnit.SECONDS)
     .convertDurationsTo(TimeUnit.MILLISECONDS)
     .build()
+//  val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+//    .convertRatesTo(TimeUnit.SECONDS)
+//    .convertDurationsTo(TimeUnit.MILLISECONDS)
+//    .build()
 
   val stats = new collection.mutable.HashMap[String, Metric]
 
diff --git a/app/controllers/ApplicationController.scala b/app/controllers/ApplicationController.scala
index 8384fd9..ab08044 100644
--- a/app/controllers/ApplicationController.scala
+++ b/app/controllers/ApplicationController.scala
@@ -17,7 +17,7 @@
   var connectionOption = CONNECTION -> "Keep-Alive"
   var keepAliveOption = "Keep-Alive" -> "timeout=5, max=100"
 
-  consoleReporter.start(10, TimeUnit.SECONDS)
+  reporter.start(10, TimeUnit.SECONDS)
 
   def updateHealthCheck(isHealthy: Boolean) = Action { request =>
     this.isHealthy = isHealthy
diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala b/loader/src/main/scala/subscriber/GraphSubscriber.scala
index 07300f6..aec5c4f 100644
--- a/loader/src/main/scala/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/subscriber/GraphSubscriber.scala
@@ -26,63 +26,39 @@
     zkQuorum = zkAddr.getOrElse("localhost")
     kafkaBrokers = kafkaBrokerList.getOrElse("localhost:9092")
     val s = s"""
-db.default.driver=com.mysql.jdbc.Driver
-db.default.url="$database"
-db.default.user=graph
-db.default.password=graph
+               |logger.root=ERROR
+               |
+               |# Logger used by the framework:
+               |logger.play=INFO
+               |
+               |# Logger provided to your application:
+               |logger.application=DEBUG
+               |
+               |# APP PHASE
+               |phase=dev
+               |
+               |# DB
+               |db.default.driver=com.mysql.jdbc.Driver
+               |db.default.url="$database"
+               |db.default.user=graph
+               |db.default.password=graph
+               |
+               |# Query server
+               |is.query.server=true
+               |is.write.server=true
+               |
+               |# Local Cache
+               |cache.ttl.seconds=60
+               |cache.max.size=100000
+               |
+               |# HBASE
+               |hbase.client.operation.timeout=60000
+               |
+               |# Kafka
+               |kafka.metadata.broker.list="$kafkaBrokers"
+               |kafka.producer.pool.size=0
+               |    """.stripMargin
 
-cache.ttl.seconds=60000
-cache.max.size=100000
-
-hbase.connection.pool.size=1
-hbase.table.pool.size=10
-hbase.client.ipc.pool.size=1
-zookeeper.recovery.retry=10
-zookeeper.session.timeout=180000
-hbase.zookeeper.quorum="$zkQuorum"
-hbase.table.name="s2graph-alpha"
-hbase.client.operation.timeout=10000
-hbase.client.retries.number=10
-hbase.client.write.operation.timeout=10000
-hbase.client.write.retries.number=10
-    
-kafka.metadata.broker.list="$kafkaBrokers"
-kafka.zookeeper="tokyo043.kr2.iwilab.com"
-kafka.request.required.acks=1
-kafka.producer.type="sync"
-kafka.producer.buffer.flush.time=1000
-kafka.producer.buffer.size=1000
-kafka.producer.pool.size=1
-kafka.aggregate.flush.timeout=1000
-    
-# Aggregator
-client.aggregate.buffer.size=100
-client.aggregate.buffer.flush.time=10000
-client.aggregate.pool.size=1
-
-
-# blocking execution context
-contexts {
-	query {
-		fork-join-executor {
-  		parallelism-min = 1
-    	parallelism-max = 1
-		}
-	}
-	blocking {
-		fork-join-executor {
-  		parallelism-min = 1
-    	parallelism-max = 1
-		}
-	}
-	scheduler {
-	 	fork-join-executor {
-  		parallelism-min = 1
-    	parallelism-max = 1
-		}
-	}
-}
-  """
     println(s)
     ConfigFactory.parseString(s)
   }
diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
index 10e0b61..4e5b60a 100644
--- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
+++ b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
@@ -6,22 +6,22 @@
 import s2.spark.WithKafka
 
 class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
-  val phase = "alpha"
-  val dbUrl = "jdbc:mysql://nuk151.kr2.iwilab.com:13306/graph_alpha"
-  val zkQuorum = "tokyo062.kr2.iwilab.com"
-  val kafkaBrokerList = "rabat176.kr2.iwilab.com:9099"
+  val phase = "dev"
+  val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
+  val zkQuorum = "localhost"
+  val kafkaBrokerList = "localhost:9099"
   val currentTs = System.currentTimeMillis()
   val op = "insertBulk"
-  val testLabelName = "talk_friend_long_term_agg"
-  val labelToReplace = "talk_friend_long_term_agg_2015-10-10"
+  val testLabelName = "s2graph_label_test"
+  val labelToReplace = "s2graph_label_test_new"
   val serviceName = "s2graph"
-  val columnName = "account_id"
+  val columnName = "user_id"
   val columnType = "long"
   val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
   val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false))
-  val hTableName = "graph_test_tc"
+  val hTableName = "s2graph-dev_new"
   val ttl = 86000
-  val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ttalk_friend_long_term_agg\t{\"interests\":{},\"age_band\":67,\"account_id\":10099240432,\"gift_score\":0,\"service_user_id\":0,\"profile_id\":0,\"is_favorite\":\"false\",\"is_story_friend\":\"false\",\"talk_score\":1000,\"gender\":\"F\",\"interest_score\":0,\"score\":1096,\"birth_date\":\"\",\"birth_year\":1948,\"agedist_score\":966}")
+  val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}")
 
   GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
 
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
index fa754a0..60880ca 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
@@ -261,7 +261,9 @@
       }
     edgePuts
   }
-
+  def insertBulk() = {
+    edgesWithInvertedIndex.buildPut :: edgesWithIndex.flatMap(e => e.buildPuts())
+  }
   def insert() = {
     edgesWithInvertedIndex.buildPutAsync :: edgesWithIndex.flatMap(e => e.buildPutsAsync)
   }
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
index ee40b3e..4c81e35 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
@@ -1,13 +1,9 @@
 package com.daumkakao.s2graph.core
 
-import com.daumkakao.s2graph.core.Not
 import play.api.libs.json._
 import HBaseElement.InnerVal
 
 import scala.util.parsing.combinator.JavaTokenParsers
-import scala.util.parsing.combinator.Parsers.Parser
-import scala.util.parsing.combinator.Parsers.Success
-import scala.util.parsing.combinator.Parsers.~
 
 trait JSONParser {
 
diff --git a/test/controllers/GraphSpec.scala b/test/controllers/GraphSpec.scala
deleted file mode 100644
index 48730f5..0000000
--- a/test/controllers/GraphSpec.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-package test.controllers
-
-import com.daumkakao.s2graph.core.Management._
-import com.daumkakao.s2graph.core._
-import controllers.{AdminController, RequestParser}
-import org.specs2.mutable.Specification
-import play.api.libs.json.Json
-import scala.Option.option2Iterable
-/**
- * test-only kgraph.GraphSpec
- */
-class GraphSpec extends Specification with RequestParser {
-  sequential
-
-  private val phase = "alpha"
-  //  Graph.tableName = tableName
-  System.setProperty("phase", phase)
-//  Config.phase = phase
-  private val serviceName = "s2graph"
-  private val labelName = "graph_test"
-    
-  val msgs = """
-    ["1417079166891\tu\te\t156598730\t158524147\tgraph_test\t{\"is_blocked\":true}","1417079166000\td\te\t4079848\t54739565\tgraph_test","1417079166000\ti\te\t102735518\t49629673\tgraph_test","1417079166000\ti\te\t15120422\t160902634\tgraph_test","1417079166000\ti\te\t160251573\t7960026\tgraph_test","1417079167000\td\te\t816808\t47823955\tgraph_test","1417079166000\ti\te\t160251573\t43063061\tgraph_test","1417079167000\ti\te\t141391508\t151985238\tgraph_test","1417079167360\tu\te\t43237903\t49005293\tgraph_test\t{\"is_blocked\":true}","1417079167000\ti\te\t9788808\t167206573\tgraph_test","1417079167000\ti\te\t106040854\t166946583\tgraph_test","1417079166000\ti\te\t15120422\t35674614\tgraph_test","1417079167519\tu\te\t29460631\t28640554\tgraph_test\t{\"is_blocked\":false}","1417079166000\ti\te\t15120422\t6366975\tgraph_test","1417079167000\tu\te\t4610363\t146176454\tgraph_test\t{\"is_hidden\":true}","1417079167000\td\te\t4935659\t1430321\tgraph_test","1417079166000\ti\te\t15120422\t136919643\tgraph_test","1417079167000\td\te\t84547561\t17923231\tgraph_test","1417079167000\td\te\t108876864\t19695266\tgraph_test","1417079166000\td\te\t14644128\t11853672\tgraph_test","1417079167000\ti\te\t142522237\t167208738\tgraph_test"]"""
-
-  val createService = """{"serviceName": "s2graph"}"""
-
-  val createLabel = """
-  {"label": "graph_test", "srcServiceName": "s2graph", "srcColumnName": "account_id", "srcColumnType": "long", "tgtServiceName": "s2graph", "tgtColumnName": "account_id", "tgtColumnType": "long", "indexProps": {"time": 0, "weight":0}}
-  """
-    
-  // 0.
-  AdminController.createServiceInner(Json.parse(createService))
-  // 1.   
-  AdminController.deleteLabel(labelName)
-  // 2. 
-  AdminController.createLabelInner(Json.parse(createLabel))
-  
-  val label = Label.findByName("graph_test").get
-  
-  val insertEdges = """
-[
-  {"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":193829192},
-  {"from":1,"to":102,"label":"graph_test","props":{"time":0, "weight":11},"timestamp":193829193},
-  {"from":1,"to":103,"label":"graph_test","props":{"time":1, "weight":12},"timestamp":193829194},
-  {"from":1,"to":104,"label":"graph_test","props":{"time":-2, "weight":1},"timestamp":193829195}
-]
-  """
-//  val insertResults = List(
-//  Edge(Vertex(CompositeId(label.srcColumn.id.get, InnerVal.withLong(1), true)), Vertex(CompositeId(label.tgtColumn.id.get, InnerVal.withLong(103), true)), LabelWithDirection(label.id.get, 0), 0, 193829192, )
-//  )
-  val insertEdgesTsv = """["193829192\ti\te\t1\t101\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}","193829192\ti\te\t1\t102\tgraph_test\t{\"time\":1, \"weight\": 19, \"is_hidden\": false}","193829192\ti\te\t1\t103\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}"]"""
-
-  val updateEdges = """
-[
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829199, "props": {"is_hidden":true}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829299, "props": {"weight":100}}
-]
-  """
-  val incrementEdges = """
-[
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829399, "props": {"weight":100}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829499, "props": {"weight":100}},
-  {"from":1,"to":103,"label":"graph_test","timestamp":193829599, "props": {"weight":1000}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":-200}},
-  {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":400}} 
-]   
-  """
-
-  val queryEdges = """
-{
-    "srcVertices": [{"columnName": "account_id", "id":1}],
-    "steps": [
-      [{"label": "graph_test", "direction": "out", "limit": 100, "scoring":{"time": 0, "weight": 1}}]
-    ]
-}
-  """
-  val query = toQuery(Json.parse(queryEdges))
-
-  "RequestParsor" should {
-    "Json parsed edge should match Graph.toEdge" in {
-      val jsons = Json.parse(insertEdges)
-      val jsonEdges = toEdges(jsons, "insert")
-      val tsvs = GraphUtil.parseString(insertEdgesTsv)
-      val tsvEdges = tsvs.map(tsv => Graph.toEdge(tsv)).flatten
-      jsonEdges must containTheSameElementsAs(tsvEdges)
-    }
-  }
-//  "Graph" should {
-//    "test insert" in {
-//      EdgeController.tryMutates(Json.parse(insertEdges), "insert")
-//      val edgesWithRank = Graph.getEdgesSync(query)
-//      edgesWithRank must have size(1)
-//      val head = edgesWithRank.head.toArray
-//      head must have size(4)
-//      val (firstEdge, firstRAnk) = head(0)
-//      firstEdge.tgtVertex.innerId must beEqualTo(InnerVal.withLong(103)) 
-//      firstEdge.ts must beEqualTo (193829194)
-//      
-//      true
-//    }
-//  }
-}
\ No newline at end of file
diff --git a/test/controllers/IntegritySpec.scala b/test/controllers/IntegritySpec.scala
index 0440b93..9a7b8ae 100644
--- a/test/controllers/IntegritySpec.scala
+++ b/test/controllers/IntegritySpec.scala
@@ -452,8 +452,8 @@
   def initialize = {
     running(FakeApplication()) {
 
-      KafkaAggregatorActor.init()
-      Graph(Config.conf)(ExecutionContext.Implicits.global)
+//      KafkaAggregatorActor.init()
+      Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
 
       // 1. createService
       var result = AdminController.createServiceInner(Json.parse(createService))
@@ -480,7 +480,7 @@
       //      println(s">> Label deleted : $testLabelName, $result")
       // 2. delete service ( currently NOT supported )
 
-      KafkaAggregatorActor.shutdown()
+//      KafkaAggregatorActor.shutdown()
     }
 
   }
diff --git a/test/controllers/QuerySpec.scala b/test/controllers/QuerySpec.scala
index 3e39520..3f0a01a 100644
--- a/test/controllers/QuerySpec.scala
+++ b/test/controllers/QuerySpec.scala
@@ -52,7 +52,7 @@
 
           //    logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
           status(acts) must equalTo(OK)
-          contentType(acts) must beSome.which(_ == "text/plain")
+          contentType(acts) must beSome.which(_ == "application/json")
           val jsRslt = contentAsJson(acts)
           println("======")
           println(jsRslt)
@@ -100,7 +100,7 @@
 
           //    logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
           status(acts) must equalTo(OK)
-          contentType(acts) must beSome.which(_ == "text/plain")
+          contentType(acts) must beSome.which(_ == "application/json")
           val jsRslt = contentAsJson(acts)
           println("======")
           println(jsRslt)
@@ -142,7 +142,7 @@
 
           //    logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
           status(acts) must equalTo(OK)
-          contentType(acts) must beSome.which(_ == "text/plain")
+          contentType(acts) must beSome.which(_ == "application/json")
           val jsRslt = contentAsJson(acts)
           println("======")
           println(jsRslt)
@@ -161,9 +161,10 @@
 }
 
 abstract class QuerySpecificationBase extends Specification with RequestParser {
-  protected val testServiceName = "s2graph_test"
+  protected val testServiceName = "s2graph"
   protected val testLabelName = "s2graph_label_test"
   protected val testColumnName = "user_id"
+  val asyncFlushInterval = 1500 // in mill
 
   val createService =
     s"""
@@ -247,27 +248,33 @@
 
   def initialize = {
     running(FakeApplication()) {
-      KafkaAggregatorActor.init()
       Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
 
       // 1. createService
       var result = AdminController.createServiceInner(Json.parse(createService))
       println(s">> Service created : $createService, $result")
       // 2. createLabel
-      result = AdminController.createLabelInner(Json.parse(createLabel))
-      println(s">> Label created : $createLabel, $result")
-      
+      try {
+        AdminController.createLabelInner(Json.parse(createLabel))
+        println(s">> Label created : $createLabel, $result")
+      } catch {
+        case e: Throwable =>
+      }
+
       // 3. insert edges
       val jsArrStr = s"[${edgesInsertInitial.mkString(",")}]"
       println(s">> 1st step Inserts : $jsArrStr")
       val inserts = toEdges(Json.parse(jsArrStr), "insert")
       Graph.mutateEdges(inserts)
+      Thread.sleep(asyncFlushInterval)
+
       println(s"<< 1st step Inserted : $jsArrStr")
 
       val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
       println(s">> 2nd step Inserts : $jsArrStr2nd")
       val inserts2nd = toEdges(Json.parse(jsArrStr2nd), "insert")
       Graph.mutateEdges(inserts2nd)
+      Thread.sleep(asyncFlushInterval)
       println(s"<< 2nd step Inserted : $inserts2nd")
     }
   }
@@ -282,18 +289,19 @@
       println(s"Deletes : $jsArrStr")
       val deletes = toEdges(Json.parse(jsArrStr), "delete")
       Graph.mutateEdges(deletes)
+      Thread.sleep(asyncFlushInterval)
 
       val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
       println(s">> 2nd step Deletes : $jsArrStr2nd")
       val deletes2nd = toEdges(Json.parse(jsArrStr2nd), "delete")
       Graph.mutateEdges(deletes2nd)
+      Thread.sleep(asyncFlushInterval)
       println(s"<< 2nd step Deleted : $deletes2nd")
 
       // 2. delete label ( currently NOT supported )
 //      var result = AdminController.deleteLabelInner(testLabelName)
 //      println(s">> Label deleted : $testLabelName, $result")
       // 3. delete service ( currently NOT supported )
-      KafkaAggregatorActor.shutdown()
     }
 
   }
diff --git a/test/controllers/RequestParserSpec.scala b/test/controllers/RequestParserSpec.scala
index bfbce2a..acf0f82 100644
--- a/test/controllers/RequestParserSpec.scala
+++ b/test/controllers/RequestParserSpec.scala
@@ -14,6 +14,7 @@
 class RequestParserSpec extends Specification {
 
   // dummy data for dummy edge
+  val testLabelName = "s2graph_label_test"
   val ts = System.currentTimeMillis()
   val srcId = CompositeId(0, InnerVal.withLong(1), isEdge = true, useHash = true)
   val tgtId = CompositeId(0, InnerVal.withLong(2), isEdge = true, useHash = true)
@@ -48,7 +49,7 @@
   "RequestParser WhereParser" should {
     "check where clause not nested" in {
       running(FakeApplication()) {
-        val labelOpt = Label.findByName("graph_test")
+        val labelOpt = Label.findByName(testLabelName)
         labelOpt must beSome[Label]
         val label = labelOpt.get
         val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -58,7 +59,7 @@
         val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
         play.api.Logger.debug(s"$edge")
         
-        val f = validate("graph_test")(edge)_
+        val f = validate(testLabelName)(edge)_
 
         f("is_hidden = false")(false)
         f("is_hidden != false")(true)
@@ -76,7 +77,7 @@
     }
     "check where clause nested" in {
       running(FakeApplication()) {
-        val labelOpt = Label.findByName("graph_test")
+        val labelOpt = Label.findByName(testLabelName)
         labelOpt must beSome[Label]
         val label = labelOpt.get
         val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -84,7 +85,7 @@
         val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
         val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
 
-        val f = validate("graph_test")(edge)_
+        val f = validate(testLabelName)(edge)_
 
         f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = false")(false)
         f("(time in (1, 2, 3) or is_blocked = true) or is_hidden = false")(true)
@@ -104,7 +105,7 @@
     }
     "check where clause with from/to long" in {
       running(FakeApplication()) {
-        val labelOpt = Label.findByName("graph_test")
+        val labelOpt = Label.findByName(testLabelName)
         labelOpt must beSome[Label]
         val label = labelOpt.get
         val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -112,7 +113,7 @@
         val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
         val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
 
-        val f = validate("graph_test")(edge)_
+        val f = validate(testLabelName)(edge)_
 
 //        f("from = abc")(false)
         f("_from = 2 or _to = 2")(true)