blob: 4e5b60a668db9916f8e92637675c2ffb20542acb [file] [log] [blame]
package subscriber
import com.daumkakao.s2graph.core.{Label, Service, Management}
import org.scalatest.{ FunSuite, Matchers }
import play.api.libs.json.{JsBoolean, JsNumber}
import s2.spark.WithKafka
class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
val phase = "dev"
val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
val zkQuorum = "localhost"
val kafkaBrokerList = "localhost:9099"
val currentTs = System.currentTimeMillis()
val op = "insertBulk"
val testLabelName = "s2graph_label_test"
val labelToReplace = "s2graph_label_test_new"
val serviceName = "s2graph"
val columnName = "user_id"
val columnType = "long"
val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false))
val hTableName = "s2graph-dev_new"
val ttl = 86000
val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}")
GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
test("GraphSubscriberHelper.store") {
// actually we need to delete labelToReplace first for each test.
Management.copyLabel(testLabelName, labelToReplace, Some(hTableName))
//
// val msgs = (for {
// i <- (1 until 10)
// j <- (100 until 110)
// } yield {
// s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName"
// }).toSeq
val msgs = testStrings
val stat = GraphSubscriberHelper.store(msgs, Some(labelToReplace))(None)
println(stat)
}
}