[S2GRAPH-118] Fix compile error on test cases on loader project.
JIRA:
[S2GRAPH-118] https://issues.apache.org/jira/browse/S2GRAPH-118
Pull Request:
Closes #87
Authors
DO YUNG YOON: steamshon@apache.org
diff --git a/CHANGES b/CHANGES
index 3a809d0..8740c59 100644
--- a/CHANGES
+++ b/CHANGES
@@ -109,6 +109,8 @@
S2GRAPH-65: Deferred produce exception (Committed by DOYUNG YOON).
S2GRAPH-64: incrementCounts yield type case exception (Committed by DOYUNG YOON).
+
+ S2GRAPH-118: Fix compile error on test cases on loader project (Committed by DOYUNG YOON).
TASKS
diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala
deleted file mode 100644
index 64d8198..0000000
--- a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.loader.subscriber
-
-import org.apache.s2graph.core.Management
-import org.apache.s2graph.spark.spark.WithKafka
-import org.scalatest.{ FunSuite, Matchers }
-import play.api.libs.json.{JsBoolean, JsNumber}
-
-class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
- val phase = "dev"
- val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
- val zkQuorum = "localhost"
- val kafkaBrokerList = "localhost:9099"
- val currentTs = System.currentTimeMillis()
- val op = "insertBulk"
- val testLabelName = "s2graph_label_test"
- val labelToReplace = "s2graph_label_test_new"
- val serviceName = "s2graph"
- val columnName = "user_id"
- val columnType = "long"
- val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
- val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false))
- val hTableName = "s2graph-dev_new"
- val ttl = 86000
- val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}")
-
- GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
-
- test("GraphSubscriberHelper.store") {
- // actually we need to delete labelToReplace first for each test.
- val labelMapping = Map(testLabelName -> labelToReplace)
- GraphSubscriberHelper.management.copyLabel(testLabelName, labelToReplace, Some(hTableName))
-
-//
-// val msgs = (for {
-// i <- (1 until 10)
-// j <- (100 until 110)
-// } yield {
-// s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName"
-// }).toSeq
- val msgs = testStrings
-
-// val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping = labelMapping, autoCreateEdge = false)(None)
-// println(stat)
- }
-}
diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
deleted file mode 100644
index 0937f4c..0000000
--- a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.loader.subscriber
-
-import org.apache.s2graph.core.Management
-import org.apache.s2graph.core.types.HBaseType
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest._
-import TransferToHFile._
-
-/**
- * Created by Eric on 2015. 12. 2..
- */
-class TransferToHFileTest extends FlatSpec with BeforeAndAfterAll with Matchers {
-
- private val master = "local[2]"
- private val appName = "example-spark"
-
- private var sc: SparkContext = _
-
- val dataWithoutDir =
- """
- |1447686000000 insertBulk e a b friends_rel {}
- |1447686000000 insertBulk e a c friends_rel {}
- |1447686000000 insertBulk e a d friends_rel {}
- |1447686000000 insertBulk e b d friends_rel {}
- |1447686000000 insertBulk e b e friends_rel {}
- """.stripMargin.trim
-
- val dataWithDir =
- """
- |1447686000000 insertBulk e a b friends_rel {} out
- |1447686000000 insertBulk e b a friends_rel {} in
- |1447686000000 insertBulk e a c friends_rel {} out
- |1447686000000 insertBulk e c a friends_rel {} in
- |1447686000000 insertBulk e a d friends_rel {} out
- |1447686000000 insertBulk e d a friends_rel {} in
- |1447686000000 insertBulk e b d friends_rel {} out
- |1447686000000 insertBulk e d b friends_rel {} in
- |1447686000000 insertBulk e b e friends_rel {} out
- |1447686000000 insertBulk e e b friends_rel {} in
- """.stripMargin.trim
-
- override def beforeAll(): Unit = {
- println("### beforeAll")
-
- GraphSubscriberHelper.apply("dev", "none", "none", "none")
- // 1. create service
- if(Management.findService("loader-test").isEmpty) {
- println(">>> create service...")
- Management.createService("loader-test", "localhost", "loader-test-dev", 1, None, "gz")
- }
-
- // 2. create label
- if(Management.findLabel("friends_rel").isEmpty) {
- println(">>> create label...")
- Management.createLabel(
- "friends_rel",
- "loader-test", "user_id", "string",
- "loader-test", "user_id", "string",
- true,
- "loader-test",
- Seq(),
- Seq(),
- "weak",
- None, None,
- HBaseType.DEFAULT_VERSION,
- false,
- Management.defaultCompressionAlgorithm
- )
- }
-
- // create spark context
- val conf = new SparkConf()
- .setMaster(master)
- .setAppName(appName)
-
- sc = new SparkContext(conf)
- }
-
- override def afterAll(): Unit = {
- println("### afterALL")
- if (sc != null) {
- sc.stop()
- }
-
- Management.deleteLabel("friends_rel")
- }
-
- "buildDegreePutRequest" should "transform degree to PutRequest" in {
- val putReqs = buildDegreePutRequests("a", "friends_rel", "out", 3L)
- putReqs.size should equal(1)
- }
-
- "toKeyValues" should "transform edges to KeyValues on edge format data without direction" in {
- val rdd = sc.parallelize(dataWithoutDir.split("\n"))
-
- val kvs = rdd.mapPartitions { iter =>
- GraphSubscriberHelper.apply("dev", "none", "none", "none")
- TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
- }
- kvs.foreach(println)
- // edges * 2 (snapshot edges + indexed edges)
- kvs.count() should equal(10)
-
-
- val kvsAutoCreated = rdd.mapPartitions { iter =>
- GraphSubscriberHelper.apply("dev", "none", "none", "none")
- TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], true)
- }
-
- // edges * 3 (snapshot edges + indexed edges + reverse edges)
- kvsAutoCreated.count() should equal(15)
- }
-
- "toKeyValues" should "transform edges to KeyValues on edge format data with direction" in {
- val rdd = sc.parallelize(dataWithDir.split("\n"))
-
- val kvs = rdd.mapPartitions { iter =>
- GraphSubscriberHelper.apply("dev", "none", "none", "none")
- TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
- }
-
- // edges * 2 (snapshot edges + indexed edges)
- kvs.count() should equal(20)
- }
-
- "buildDegrees" should "build degrees on edge format data without direction" in {
- val rdd = sc.parallelize(dataWithoutDir.split("\n"))
-
- // autoCreate = false
- val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) =>
- agg + current
- }.collectAsMap()
- degrees.size should equal(2)
-
- degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
- degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
-
-
- // autoCreate = true
- val degreesAutoCreated = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], true).reduceByKey { (agg, current) =>
- agg + current
- }.collectAsMap()
- degreesAutoCreated.size should equal(6)
-
- degreesAutoCreated should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
- degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
- degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "in") -> 1L)
- degreesAutoCreated should contain(DegreeKey("c", "friends_rel", "in") -> 1L)
- degreesAutoCreated should contain(DegreeKey("d", "friends_rel", "in") -> 2L)
- degreesAutoCreated should contain(DegreeKey("e", "friends_rel", "in") -> 1L)
- }
-
- "buildDegrees" should "build degrees on edge format data with direction" in {
- val rdd = sc.parallelize(dataWithDir.split("\n"))
-
- val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) =>
- agg + current
- }.collectAsMap()
-
- degrees.size should equal(6)
-
- degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
- degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
- degrees should contain(DegreeKey("b", "friends_rel", "in") -> 1L)
- degrees should contain(DegreeKey("c", "friends_rel", "in") -> 1L)
- degrees should contain(DegreeKey("d", "friends_rel", "in") -> 2L)
- degrees should contain(DegreeKey("e", "friends_rel", "in") -> 1L)
- }
-}