fix test errors. https://github.com/daumkakao/s2graph/issues/10
diff --git a/app/com/daumkakao/s2graph/rest/config/Config.scala b/app/com/daumkakao/s2graph/rest/config/Config.scala
index 3be7b28..4bef00a 100644
--- a/app/com/daumkakao/s2graph/rest/config/Config.scala
+++ b/app/com/daumkakao/s2graph/rest/config/Config.scala
@@ -1,7 +1,8 @@
package com.daumkakao.s2graph.rest.config
import java.util.concurrent.TimeUnit
-import com.codahale.metrics.{Metric, ConsoleReporter, MetricRegistry}
+import com.codahale.metrics.{Slf4jReporter, Metric, ConsoleReporter, MetricRegistry}
+import org.slf4j.LoggerFactory
import play.api.{Play, Logger}
object Config {
@@ -32,10 +33,15 @@
trait Instrumented extends nl.grons.metrics.scala.InstrumentedBuilder {
val metricRegistry: MetricRegistry = Config.metricRegistry
- val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+ val reporter = Slf4jReporter.forRegistry(metricRegistry)
+ .outputTo(LoggerFactory.getLogger(classOf[Instrumented]))
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
+// val consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+// .convertRatesTo(TimeUnit.SECONDS)
+// .convertDurationsTo(TimeUnit.MILLISECONDS)
+// .build()
val stats = new collection.mutable.HashMap[String, Metric]
diff --git a/app/controllers/ApplicationController.scala b/app/controllers/ApplicationController.scala
index 8384fd9..ab08044 100644
--- a/app/controllers/ApplicationController.scala
+++ b/app/controllers/ApplicationController.scala
@@ -17,7 +17,7 @@
var connectionOption = CONNECTION -> "Keep-Alive"
var keepAliveOption = "Keep-Alive" -> "timeout=5, max=100"
- consoleReporter.start(10, TimeUnit.SECONDS)
+ reporter.start(10, TimeUnit.SECONDS)
def updateHealthCheck(isHealthy: Boolean) = Action { request =>
this.isHealthy = isHealthy
diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala b/loader/src/main/scala/subscriber/GraphSubscriber.scala
index 07300f6..aec5c4f 100644
--- a/loader/src/main/scala/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/subscriber/GraphSubscriber.scala
@@ -26,63 +26,39 @@
zkQuorum = zkAddr.getOrElse("localhost")
kafkaBrokers = kafkaBrokerList.getOrElse("localhost:9092")
val s = s"""
-db.default.driver=com.mysql.jdbc.Driver
-db.default.url="$database"
-db.default.user=graph
-db.default.password=graph
+ |logger.root=ERROR
+ |
+ |# Logger used by the framework:
+ |logger.play=INFO
+ |
+ |# Logger provided to your application:
+ |logger.application=DEBUG
+ |
+ |# APP PHASE
+ |phase=dev
+ |
+ |# DB
+ |db.default.driver=com.mysql.jdbc.Driver
+ |db.default.url="$database"
+ |db.default.user=graph
+ |db.default.password=graph
+ |
+ |# Query server
+ |is.query.server=true
+ |is.write.server=true
+ |
+ |# Local Cache
+ |cache.ttl.seconds=60
+ |cache.max.size=100000
+ |
+ |# HBASE
+ |hbase.client.operation.timeout=60000
+ |
+ |# Kafka
+ |kafka.metadata.broker.list="$kafkaBrokers"
+ |kafka.producer.pool.size=0
+ | """.stripMargin
-cache.ttl.seconds=60000
-cache.max.size=100000
-
-hbase.connection.pool.size=1
-hbase.table.pool.size=10
-hbase.client.ipc.pool.size=1
-zookeeper.recovery.retry=10
-zookeeper.session.timeout=180000
-hbase.zookeeper.quorum="$zkQuorum"
-hbase.table.name="s2graph-alpha"
-hbase.client.operation.timeout=10000
-hbase.client.retries.number=10
-hbase.client.write.operation.timeout=10000
-hbase.client.write.retries.number=10
-
-kafka.metadata.broker.list="$kafkaBrokers"
-kafka.zookeeper="tokyo043.kr2.iwilab.com"
-kafka.request.required.acks=1
-kafka.producer.type="sync"
-kafka.producer.buffer.flush.time=1000
-kafka.producer.buffer.size=1000
-kafka.producer.pool.size=1
-kafka.aggregate.flush.timeout=1000
-
-# Aggregator
-client.aggregate.buffer.size=100
-client.aggregate.buffer.flush.time=10000
-client.aggregate.pool.size=1
-
-
-# blocking execution context
-contexts {
- query {
- fork-join-executor {
- parallelism-min = 1
- parallelism-max = 1
- }
- }
- blocking {
- fork-join-executor {
- parallelism-min = 1
- parallelism-max = 1
- }
- }
- scheduler {
- fork-join-executor {
- parallelism-min = 1
- parallelism-max = 1
- }
- }
-}
- """
println(s)
ConfigFactory.parseString(s)
}
diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
index 10e0b61..4e5b60a 100644
--- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
+++ b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
@@ -6,22 +6,22 @@
import s2.spark.WithKafka
class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
- val phase = "alpha"
- val dbUrl = "jdbc:mysql://nuk151.kr2.iwilab.com:13306/graph_alpha"
- val zkQuorum = "tokyo062.kr2.iwilab.com"
- val kafkaBrokerList = "rabat176.kr2.iwilab.com:9099"
+ val phase = "dev"
+ val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
+ val zkQuorum = "localhost"
+ val kafkaBrokerList = "localhost:9099"
val currentTs = System.currentTimeMillis()
val op = "insertBulk"
- val testLabelName = "talk_friend_long_term_agg"
- val labelToReplace = "talk_friend_long_term_agg_2015-10-10"
+ val testLabelName = "s2graph_label_test"
+ val labelToReplace = "s2graph_label_test_new"
val serviceName = "s2graph"
- val columnName = "account_id"
+ val columnName = "user_id"
val columnType = "long"
val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false))
- val hTableName = "graph_test_tc"
+ val hTableName = "s2graph-dev_new"
val ttl = 86000
- val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ttalk_friend_long_term_agg\t{\"interests\":{},\"age_band\":67,\"account_id\":10099240432,\"gift_score\":0,\"service_user_id\":0,\"profile_id\":0,\"is_favorite\":\"false\",\"is_story_friend\":\"false\",\"talk_score\":1000,\"gender\":\"F\",\"interest_score\":0,\"score\":1096,\"birth_date\":\"\",\"birth_year\":1948,\"agedist_score\":966}")
+ val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}")
GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
index fa754a0..60880ca 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/Edge.scala
@@ -261,7 +261,9 @@
}
edgePuts
}
-
+ def insertBulk() = {
+ edgesWithInvertedIndex.buildPut :: edgesWithIndex.flatMap(e => e.buildPuts())
+ }
def insert() = {
edgesWithInvertedIndex.buildPutAsync :: edgesWithIndex.flatMap(e => e.buildPutsAsync)
}
diff --git a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
index ee40b3e..4c81e35 100644
--- a/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/com/daumkakao/s2graph/core/JSONParser.scala
@@ -1,13 +1,9 @@
package com.daumkakao.s2graph.core
-import com.daumkakao.s2graph.core.Not
import play.api.libs.json._
import HBaseElement.InnerVal
import scala.util.parsing.combinator.JavaTokenParsers
-import scala.util.parsing.combinator.Parsers.Parser
-import scala.util.parsing.combinator.Parsers.Success
-import scala.util.parsing.combinator.Parsers.~
trait JSONParser {
diff --git a/test/controllers/GraphSpec.scala b/test/controllers/GraphSpec.scala
deleted file mode 100644
index 48730f5..0000000
--- a/test/controllers/GraphSpec.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-package test.controllers
-
-import com.daumkakao.s2graph.core.Management._
-import com.daumkakao.s2graph.core._
-import controllers.{AdminController, RequestParser}
-import org.specs2.mutable.Specification
-import play.api.libs.json.Json
-import scala.Option.option2Iterable
-/**
- * test-only kgraph.GraphSpec
- */
-class GraphSpec extends Specification with RequestParser {
- sequential
-
- private val phase = "alpha"
- // Graph.tableName = tableName
- System.setProperty("phase", phase)
-// Config.phase = phase
- private val serviceName = "s2graph"
- private val labelName = "graph_test"
-
- val msgs = """
- ["1417079166891\tu\te\t156598730\t158524147\tgraph_test\t{\"is_blocked\":true}","1417079166000\td\te\t4079848\t54739565\tgraph_test","1417079166000\ti\te\t102735518\t49629673\tgraph_test","1417079166000\ti\te\t15120422\t160902634\tgraph_test","1417079166000\ti\te\t160251573\t7960026\tgraph_test","1417079167000\td\te\t816808\t47823955\tgraph_test","1417079166000\ti\te\t160251573\t43063061\tgraph_test","1417079167000\ti\te\t141391508\t151985238\tgraph_test","1417079167360\tu\te\t43237903\t49005293\tgraph_test\t{\"is_blocked\":true}","1417079167000\ti\te\t9788808\t167206573\tgraph_test","1417079167000\ti\te\t106040854\t166946583\tgraph_test","1417079166000\ti\te\t15120422\t35674614\tgraph_test","1417079167519\tu\te\t29460631\t28640554\tgraph_test\t{\"is_blocked\":false}","1417079166000\ti\te\t15120422\t6366975\tgraph_test","1417079167000\tu\te\t4610363\t146176454\tgraph_test\t{\"is_hidden\":true}","1417079167000\td\te\t4935659\t1430321\tgraph_test","1417079166000\ti\te\t15120422\t136919643\tgraph_test","1417079167000\td\te\t84547561\t17923231\tgraph_test","1417079167000\td\te\t108876864\t19695266\tgraph_test","1417079166000\td\te\t14644128\t11853672\tgraph_test","1417079167000\ti\te\t142522237\t167208738\tgraph_test"]"""
-
- val createService = """{"serviceName": "s2graph"}"""
-
- val createLabel = """
- {"label": "graph_test", "srcServiceName": "s2graph", "srcColumnName": "account_id", "srcColumnType": "long", "tgtServiceName": "s2graph", "tgtColumnName": "account_id", "tgtColumnType": "long", "indexProps": {"time": 0, "weight":0}}
- """
-
- // 0.
- AdminController.createServiceInner(Json.parse(createService))
- // 1.
- AdminController.deleteLabel(labelName)
- // 2.
- AdminController.createLabelInner(Json.parse(createLabel))
-
- val label = Label.findByName("graph_test").get
-
- val insertEdges = """
-[
- {"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":193829192},
- {"from":1,"to":102,"label":"graph_test","props":{"time":0, "weight":11},"timestamp":193829193},
- {"from":1,"to":103,"label":"graph_test","props":{"time":1, "weight":12},"timestamp":193829194},
- {"from":1,"to":104,"label":"graph_test","props":{"time":-2, "weight":1},"timestamp":193829195}
-]
- """
-// val insertResults = List(
-// Edge(Vertex(CompositeId(label.srcColumn.id.get, InnerVal.withLong(1), true)), Vertex(CompositeId(label.tgtColumn.id.get, InnerVal.withLong(103), true)), LabelWithDirection(label.id.get, 0), 0, 193829192, )
-// )
- val insertEdgesTsv = """["193829192\ti\te\t1\t101\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}","193829192\ti\te\t1\t102\tgraph_test\t{\"time\":1, \"weight\": 19, \"is_hidden\": false}","193829192\ti\te\t1\t103\tgraph_test\t{\"time\":1, \"weight\": 10, \"is_hidden\": false}"]"""
-
- val updateEdges = """
-[
- {"from":1,"to":102,"label":"graph_test","timestamp":193829199, "props": {"is_hidden":true}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829299, "props": {"weight":100}}
-]
- """
- val incrementEdges = """
-[
- {"from":1,"to":102,"label":"graph_test","timestamp":193829399, "props": {"weight":100}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829499, "props": {"weight":100}},
- {"from":1,"to":103,"label":"graph_test","timestamp":193829599, "props": {"weight":1000}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":-200}},
- {"from":1,"to":102,"label":"graph_test","timestamp":193829699, "props": {"weight":400}}
-]
- """
-
- val queryEdges = """
-{
- "srcVertices": [{"columnName": "account_id", "id":1}],
- "steps": [
- [{"label": "graph_test", "direction": "out", "limit": 100, "scoring":{"time": 0, "weight": 1}}]
- ]
-}
- """
- val query = toQuery(Json.parse(queryEdges))
-
- "RequestParsor" should {
- "Json parsed edge should match Graph.toEdge" in {
- val jsons = Json.parse(insertEdges)
- val jsonEdges = toEdges(jsons, "insert")
- val tsvs = GraphUtil.parseString(insertEdgesTsv)
- val tsvEdges = tsvs.map(tsv => Graph.toEdge(tsv)).flatten
- jsonEdges must containTheSameElementsAs(tsvEdges)
- }
- }
-// "Graph" should {
-// "test insert" in {
-// EdgeController.tryMutates(Json.parse(insertEdges), "insert")
-// val edgesWithRank = Graph.getEdgesSync(query)
-// edgesWithRank must have size(1)
-// val head = edgesWithRank.head.toArray
-// head must have size(4)
-// val (firstEdge, firstRAnk) = head(0)
-// firstEdge.tgtVertex.innerId must beEqualTo(InnerVal.withLong(103))
-// firstEdge.ts must beEqualTo (193829194)
-//
-// true
-// }
-// }
-}
\ No newline at end of file
diff --git a/test/controllers/IntegritySpec.scala b/test/controllers/IntegritySpec.scala
index 0440b93..9a7b8ae 100644
--- a/test/controllers/IntegritySpec.scala
+++ b/test/controllers/IntegritySpec.scala
@@ -452,8 +452,8 @@
def initialize = {
running(FakeApplication()) {
- KafkaAggregatorActor.init()
- Graph(Config.conf)(ExecutionContext.Implicits.global)
+// KafkaAggregatorActor.init()
+ Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
// 1. createService
var result = AdminController.createServiceInner(Json.parse(createService))
@@ -480,7 +480,7 @@
// println(s">> Label deleted : $testLabelName, $result")
// 2. delete service ( currently NOT supported )
- KafkaAggregatorActor.shutdown()
+// KafkaAggregatorActor.shutdown()
}
}
diff --git a/test/controllers/QuerySpec.scala b/test/controllers/QuerySpec.scala
index 3e39520..3f0a01a 100644
--- a/test/controllers/QuerySpec.scala
+++ b/test/controllers/QuerySpec.scala
@@ -52,7 +52,7 @@
// logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
status(acts) must equalTo(OK)
- contentType(acts) must beSome.which(_ == "text/plain")
+ contentType(acts) must beSome.which(_ == "application/json")
val jsRslt = contentAsJson(acts)
println("======")
println(jsRslt)
@@ -100,7 +100,7 @@
// logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
status(acts) must equalTo(OK)
- contentType(acts) must beSome.which(_ == "text/plain")
+ contentType(acts) must beSome.which(_ == "application/json")
val jsRslt = contentAsJson(acts)
println("======")
println(jsRslt)
@@ -142,7 +142,7 @@
// logger.debug(s">> Social Controller test Acts result : ${contentAsJson(acts)}")
status(acts) must equalTo(OK)
- contentType(acts) must beSome.which(_ == "text/plain")
+ contentType(acts) must beSome.which(_ == "application/json")
val jsRslt = contentAsJson(acts)
println("======")
println(jsRslt)
@@ -161,9 +161,10 @@
}
abstract class QuerySpecificationBase extends Specification with RequestParser {
- protected val testServiceName = "s2graph_test"
+ protected val testServiceName = "s2graph"
protected val testLabelName = "s2graph_label_test"
protected val testColumnName = "user_id"
+ val asyncFlushInterval = 1500 // in mill
val createService =
s"""
@@ -247,27 +248,33 @@
def initialize = {
running(FakeApplication()) {
- KafkaAggregatorActor.init()
Graph(Config.conf.underlying)(ExecutionContext.Implicits.global)
// 1. createService
var result = AdminController.createServiceInner(Json.parse(createService))
println(s">> Service created : $createService, $result")
// 2. createLabel
- result = AdminController.createLabelInner(Json.parse(createLabel))
- println(s">> Label created : $createLabel, $result")
-
+ try {
+ AdminController.createLabelInner(Json.parse(createLabel))
+ println(s">> Label created : $createLabel, $result")
+ } catch {
+ case e: Throwable =>
+ }
+
// 3. insert edges
val jsArrStr = s"[${edgesInsertInitial.mkString(",")}]"
println(s">> 1st step Inserts : $jsArrStr")
val inserts = toEdges(Json.parse(jsArrStr), "insert")
Graph.mutateEdges(inserts)
+ Thread.sleep(asyncFlushInterval)
+
println(s"<< 1st step Inserted : $jsArrStr")
val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
println(s">> 2nd step Inserts : $jsArrStr2nd")
val inserts2nd = toEdges(Json.parse(jsArrStr2nd), "insert")
Graph.mutateEdges(inserts2nd)
+ Thread.sleep(asyncFlushInterval)
println(s"<< 2nd step Inserted : $inserts2nd")
}
}
@@ -282,18 +289,19 @@
println(s"Deletes : $jsArrStr")
val deletes = toEdges(Json.parse(jsArrStr), "delete")
Graph.mutateEdges(deletes)
+ Thread.sleep(asyncFlushInterval)
val jsArrStr2nd = s"[${edgesInsert2ndDepth.mkString(",")}]"
println(s">> 2nd step Deletes : $jsArrStr2nd")
val deletes2nd = toEdges(Json.parse(jsArrStr2nd), "delete")
Graph.mutateEdges(deletes2nd)
+ Thread.sleep(asyncFlushInterval)
println(s"<< 2nd step Deleted : $deletes2nd")
// 2. delete label ( currently NOT supported )
// var result = AdminController.deleteLabelInner(testLabelName)
// println(s">> Label deleted : $testLabelName, $result")
// 3. delete service ( currently NOT supported )
- KafkaAggregatorActor.shutdown()
}
}
diff --git a/test/controllers/RequestParserSpec.scala b/test/controllers/RequestParserSpec.scala
index bfbce2a..acf0f82 100644
--- a/test/controllers/RequestParserSpec.scala
+++ b/test/controllers/RequestParserSpec.scala
@@ -14,6 +14,7 @@
class RequestParserSpec extends Specification {
// dummy data for dummy edge
+ val testLabelName = "s2graph_label_test"
val ts = System.currentTimeMillis()
val srcId = CompositeId(0, InnerVal.withLong(1), isEdge = true, useHash = true)
val tgtId = CompositeId(0, InnerVal.withLong(2), isEdge = true, useHash = true)
@@ -48,7 +49,7 @@
"RequestParser WhereParser" should {
"check where clause not nested" in {
running(FakeApplication()) {
- val labelOpt = Label.findByName("graph_test")
+ val labelOpt = Label.findByName(testLabelName)
labelOpt must beSome[Label]
val label = labelOpt.get
val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -58,7 +59,7 @@
val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
play.api.Logger.debug(s"$edge")
- val f = validate("graph_test")(edge)_
+ val f = validate(testLabelName)(edge)_
f("is_hidden = false")(false)
f("is_hidden != false")(true)
@@ -76,7 +77,7 @@
}
"check where clause nested" in {
running(FakeApplication()) {
- val labelOpt = Label.findByName("graph_test")
+ val labelOpt = Label.findByName(testLabelName)
labelOpt must beSome[Label]
val label = labelOpt.get
val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -84,7 +85,7 @@
val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
- val f = validate("graph_test")(edge)_
+ val f = validate(testLabelName)(edge)_
f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = false")(false)
f("(time in (1, 2, 3) or is_blocked = true) or is_hidden = false")(true)
@@ -104,7 +105,7 @@
}
"check where clause with from/to long" in {
running(FakeApplication()) {
- val labelOpt = Label.findByName("graph_test")
+ val labelOpt = Label.findByName(testLabelName)
labelOpt must beSome[Label]
val label = labelOpt.get
val labelWithDir = LabelWithDirection(label.id.get, 0)
@@ -112,7 +113,7 @@
val propsInner = Management.toProps(label, js).map { case (k, v) => k -> InnerValWithTs.withInnerVal(v, ts) }.toMap
val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 0, propsInner)
- val f = validate("graph_test")(edge)_
+ val f = validate(testLabelName)(edge)_
// f("from = abc")(false)
f("_from = 2 or _to = 2")(true)