[S2GRAPH-118] Fix compile error on test cases on loader project.
JIRA:
[S2GRAPH-118] https://issues.apache.org/jira/browse/S2GRAPH-118
Pull Request:
Closes #87
Authors
DO YUNG YOON: steamshon@apache.org
diff --git a/CHANGES b/CHANGES
index 168427d..8740c59 100644
--- a/CHANGES
+++ b/CHANGES
@@ -82,6 +82,8 @@
S2GRAPH-116: using ASM and ByteBuddy to add a proxy to Asynchbase's Scanner.
(Contributed by Jong Wook Kim<jongwook@nyu.edu>, committed by DOYUNG YOON)
+ S2GRAPH-117: Cleaner logging library usage.
+ (Contributed by Jong Wook Kim<jongwook@nyu.edu>, committed by DOYUNG YOON)
BUG FIXES
diff --git a/build.sbt b/build.sbt
index b74e191..36b56fd 100755
--- a/build.sbt
+++ b/build.sbt
@@ -31,10 +31,7 @@
testOptions in Test += Tests.Argument("-oDF"),
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
parallelExecution in Test := false,
- libraryDependencies ++= Seq(
- "org.scalaz.stream" % "scalaz-stream_2.11" % "0.7.2",
- "com.typesafe.netty" % "netty-http-pipelining" % "1.1.4"
- ),
+ libraryDependencies ++= Common.loggingRuntime,
resolvers ++= Seq(
Resolver.mavenLocal
)
@@ -42,7 +39,7 @@
Revolver.settings
-lazy val s2rest_play = project.enablePlugins(PlayScala)
+lazy val s2rest_play = project.enablePlugins(PlayScala).disablePlugins(PlayLogback)
.dependsOn(s2core, s2counter_core)
.settings(commonSettings: _*)
.settings(testOptions in Test += Tests.Argument("sequential"))
diff --git a/conf/application.conf b/conf/application.conf
index 4920380..5dfa653 100644
--- a/conf/application.conf
+++ b/conf/application.conf
@@ -72,6 +72,6 @@
# ex) to use mysql as metastore, change db.default.driver = "com.mysql.jdbc.Driver"
# and db.default.url to point to jdbc connection.
db.default.driver = "org.h2.Driver"
-db.default.url="jdbc:h2:file:./var/metastore;MODE=MYSQL"
-db.default.user="sa"
-db.default.password="sa"
+db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
+db.default.user = "sa"
+db.default.password = "sa"
diff --git a/conf/log4j.properties b/conf/log4j.properties
new file mode 100644
index 0000000..c13e516
--- /dev/null
+++ b/conf/log4j.properties
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=${root.logger}
+root.logger=INFO,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# The noisier spark/kafka logs are suppressed
+log4j.logger.org.apache.spark=WARN,console
+log4j.logger.org.apache.kafka=WARN,console
diff --git a/conf/logback.xml b/conf/logback.xml
deleted file mode 100644
index d3f09bc..0000000
--- a/conf/logback.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--->
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <layout class="ch.qos.logback.classic.PatternLayout">
- <Pattern>
- %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
- </Pattern>
- </layout>
- </appender>
- <root level="INFO">
- <appender-ref ref="STDOUT" />
- </root>
-</configuration>
diff --git a/loader/build.sbt b/loader/build.sbt
index 75e268e..73144bb 100644
--- a/loader/build.sbt
+++ b/loader/build.sbt
@@ -18,6 +18,7 @@
*/
import sbtassembly.Plugin.AssemblyKeys._
+import Common._
name := "s2loader"
@@ -29,14 +30,14 @@
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
- "org.apache.spark" %% "spark-core" % Common.sparkVersion % "provided",
- "org.apache.spark" %% "spark-streaming" % Common.sparkVersion % "provided",
- "org.apache.spark" %% "spark-hive" % Common.sparkVersion % "provided",
- "org.apache.spark" %% "spark-streaming-kafka" % Common.sparkVersion,
+ "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
"org.apache.httpcomponents" % "fluent-hc" % "4.2.5",
- "org.specs2" %% "specs2-core" % "2.4.11" % "test",
+ "org.specs2" %% "specs2-core" % specs2Version % "test",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
- "org.apache.hadoop" % "hadoop-distcp" % Common.hadoopVersion
+ "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion
)
crossScalaVersions := Seq("2.10.6")
diff --git a/loader/src/main/resources/log4j.properties b/loader/src/test/resources/log4j.properties
similarity index 100%
rename from loader/src/main/resources/log4j.properties
rename to loader/src/test/resources/log4j.properties
diff --git a/project/Common.scala b/project/Common.scala
index ef793f1..f3dfc68 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -17,13 +17,37 @@
* under the License.
*/
-object Common {
- // lazy val sparkVersion = "1.4.1-cdh5.3.3"
- lazy val sparkVersion = "1.4.1"
- lazy val playVersion = "2.3.10"
+import sbt._
- lazy val hbaseVersion = "1.2.2"
- // lazy val hbaseVersion = "1.0.0-cdh5.4.5"
- lazy val hadoopVersion = "2.7.3"
- // lazy val hadoopVersion = "2.6.0-cdh5.4.5"
+object Common {
+ val sparkVersion = "1.4.1"
+ val playVersion = "2.5.9"
+ val specs2Version = "3.8.5"
+
+ val hbaseVersion = "1.2.2"
+ val hadoopVersion = "2.7.3"
+
+ /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */
+ val loggingRuntime = Seq(
+ "log4j" % "log4j" % "1.2.17",
+ "org.slf4j" % "slf4j-log4j12" % "1.7.21",
+ "org.slf4j" % "jcl-over-slf4j" % "1.7.21",
+ "org.slf4j" % "jul-to-slf4j" % "1.7.21"
+ ).flatMap(dep => Seq(dep % "test", dep % "runtime"))
+
+ /** rules to exclude logging backends and bridging libraries from dependency */
+ val loggingExcludes = Seq(
+ ExclusionRule("commons-logging", "commons-logging"),
+ ExclusionRule("log4j", "log4j"),
+ ExclusionRule("ch.qos.logback", "logback-classic"),
+ ExclusionRule("ch.qos.logback", "logback-core"),
+ ExclusionRule("org.slf4j", "jcl-over-slf4j"),
+ ExclusionRule("org.slf4j", "log4j-over-slf4j"),
+ ExclusionRule("org.slf4j", "slf4j-log4j12"),
+ ExclusionRule("org.slf4j", "jul-to-slf4j")
+ )
+
+ implicit class LoggingExcluder(moduleId: ModuleID) {
+ def excludeLogging(): ModuleID = moduleId.excludeAll(loggingExcludes: _*)
+ }
}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index d98d1cb..a6e5381 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -19,7 +19,7 @@
// use the Play sbt plugin for Play projects
-addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.3.10")
+addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.9")
// http://www.scalastyle.org/sbt.html
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.7.0")
diff --git a/s2core/build.sbt b/s2core/build.sbt
index a456ce0..80f37b0 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -17,31 +17,34 @@
* under the License.
*/
+import Common._
+
name := """s2core"""
scalacOptions ++= Seq("-deprecation")
libraryDependencies ++= Seq(
- "ch.qos.logback" % "logback-classic" % "1.1.2",
+ "org.slf4j" % "slf4j-api" % "1.7.21",
"com.typesafe" % "config" % "1.2.1",
- "com.typesafe.play" %% "play-json" % Common.playVersion,
+ "com.typesafe.play" %% "play-json" % playVersion,
"com.typesafe.akka" %% "akka-actor" % "2.3.4",
"com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
- "org.apache.hbase" % "hbase-client" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
- "org.apache.hbase" % "hbase-common" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
- "org.apache.hbase" % "hbase-server" % Common.hbaseVersion exclude("org.slf4j", "slf4j*") exclude("com.google.protobuf", "protobuf*"),
- "org.apache.hbase" % "hbase-hadoop-compat" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
- "org.apache.hbase" % "hbase-hadoop2-compat" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
- "org.apache.kafka" % "kafka-clients" % "0.8.2.0" exclude("org.slf4j", "slf4j*") exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
+ "org.apache.hbase" % "hbase-client" % hbaseVersion excludeLogging(),
+ "org.apache.hbase" % "hbase-common" % hbaseVersion excludeLogging(),
+ "org.apache.hbase" % "hbase-server" % hbaseVersion excludeLogging() exclude("com.google.protobuf", "protobuf*"),
+ "org.apache.hbase" % "hbase-hadoop-compat" % hbaseVersion excludeLogging(),
+ "org.apache.hbase" % "hbase-hadoop2-compat" % hbaseVersion excludeLogging(),
+ "org.apache.kafka" % "kafka-clients" % "0.8.2.0" excludeLogging() exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
"commons-pool" % "commons-pool" % "1.6",
- "org.scalatest" %% "scalatest" % "2.2.4" % "test",
- "org.scalikejdbc" %% "scalikejdbc" % "2.1.+",
+ "org.scalikejdbc" %% "scalikejdbc" % "2.1.4",
"com.h2database" % "h2" % "1.4.192",
- "com.github.danielwegener" % "logback-kafka-appender" % "0.0.4",
"com.stumbleupon" % "async" % "1.4.1",
"io.netty" % "netty" % "3.9.4.Final" force(),
- "org.hbase" % "asynchbase" % "1.7.2",
- "net.bytebuddy" % "byte-buddy" % "1.4.26"
+ "org.hbase" % "asynchbase" % "1.7.2" excludeLogging(),
+ "net.bytebuddy" % "byte-buddy" % "1.4.26",
+
+ "org.scalatest" %% "scalatest" % "2.2.4" % "test",
+ "org.specs2" %% "specs2-core" % specs2Version % "test"
)
libraryDependencies := {
diff --git a/s2core/src/main/resources/logback.xml b/s2core/src/main/resources/logback.xml
deleted file mode 100644
index 7c2a495..0000000
--- a/s2core/src/main/resources/logback.xml
+++ /dev/null
@@ -1,45 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <withJansi>true</withJansi>
- <encoder>
- <pattern>
- %d{ISO8601} [%highlight(%-5level)] [%gray(%logger{0})] [%thread] - %msg%n
- </pattern>
- </encoder>
- </appender>
-
- <!--<root level="INFO">-->
- <!--<appender-ref ref="STDOUT"/>-->
- <!--</root>-->
-
- <logger name="application" level="DEBUG">
- <appender-ref ref="STDOUT"/>
- </logger>
-
- <logger name="error" level="DEBUG">
- <appender-ref ref="STDOUT"/>
- </logger>
-
-</configuration>
-
diff --git a/s2core/src/main/resources/reference.conf b/s2core/src/main/resources/reference.conf
index e5c129b..73c949a 100644
--- a/s2core/src/main/resources/reference.conf
+++ b/s2core/src/main/resources/reference.conf
@@ -49,9 +49,9 @@
s2graph.models.table.name = "models-dev"
db.default.driver = "org.h2.Driver"
-db.default.url="jdbc:h2:file:./var/metastore;MODE=MYSQL"
-db.default.user = "graph"
-db.default.password = "graph"
+db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
+db.default.user = "sa"
+db.default.password = "sa"
akka {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index caaa408..923eeef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -186,7 +186,6 @@
private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = {
def traverse(js: JsValue): JsValue = js match {
case JsNull => mapper(JsNull)
- case JsUndefined() => mapper(JsUndefined(""))
case JsNumber(v) => mapper(js)
case JsString(v) => mapper(js)
case JsBoolean(v) => mapper(js)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 61db2ef..f67a89b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -29,6 +29,7 @@
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.types._
import play.api.libs.json._
+import play.api.libs.json.Reads._
import scala.util.{Failure, Success, Try}
@@ -92,7 +93,7 @@
private def extractScoring(labelId: Int, value: JsValue) = {
val ret = for {
- js <- parse[Option[JsObject]](value, "scoring")
+ js <- parseOption[JsObject](value, "scoring")
} yield {
for {
(k, v) <- js.fields
@@ -113,16 +114,16 @@
val jsValue = Json.parse(replaced)
def extractKv(js: JsValue) = js match {
- case JsObject(obj) => obj
+ case JsObject(map) => map.toSeq
case JsArray(arr) => arr.flatMap {
- case JsObject(obj) => obj
- case _ => throw new RuntimeException(s"cannot support json type $js")
+ case JsObject(map) => map.toSeq
+ case _ => throw new RuntimeException(s"cannot support json type: $js")
}
case _ => throw new RuntimeException(s"cannot support json type: $js")
}
val ret = for {
- js <- parse[Option[JsObject]](jsValue, "interval")
+ js <- parseOption[JsObject](jsValue, "interval")
fromJs <- (js \ "from").asOpt[JsValue]
toJs <- (js \ "to").asOpt[JsValue]
} yield {
@@ -139,10 +140,10 @@
val jsValue = Json.parse(replaced)
for {
- js <- parse[Option[JsObject]](jsValue, "duration")
+ js <- parseOption[JsObject](jsValue, "duration")
} yield {
- val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue)
- val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue)
+ val minTs = parseOption[Long](js, "from").getOrElse(Long.MaxValue)
+ val maxTs = parseOption[Long](js, "to").getOrElse(Long.MinValue)
if (minTs > maxTs) {
throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.")
@@ -154,7 +155,7 @@
def extractHas(label: Label, jsValue: JsValue) = {
val ret = for {
- js <- parse[Option[JsObject]](jsValue, "has")
+ js <- parseOption[JsObject](jsValue, "has")
} yield {
for {
(k, v) <- js.fields
@@ -346,12 +347,12 @@
private def parseQueryParam(labelGroup: JsValue): Option[QueryParam] = {
for {
- labelName <- parse[Option[String]](labelGroup, "label")
+ labelName <- parseOption[String](labelGroup, "label")
} yield {
val label = Label.findByName(labelName).getOrElse(throw BadQueryException(s"$labelName not found"))
- val direction = parse[Option[String]](labelGroup, "direction").map(GraphUtil.toDirection(_)).getOrElse(0)
+ val direction = parseOption[String](labelGroup, "direction").map(GraphUtil.toDirection(_)).getOrElse(0)
val limit = {
- parse[Option[Int]](labelGroup, "limit") match {
+ parseOption[Int](labelGroup, "limit") match {
case None => defaultLimit
case Some(l) if l < 0 => maxLimit
case Some(l) if l >= 0 =>
@@ -359,12 +360,12 @@
Math.min(l, default)
}
}
- val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0)
+ val offset = parseOption[Int](labelGroup, "offset").getOrElse(0)
val interval = extractInterval(label, labelGroup)
val duration = extractDuration(label, labelGroup)
val scoring = extractScoring(label.id.get, labelGroup).getOrElse(List.empty[(Byte, Double)]).toList
- val exclude = parse[Option[Boolean]](labelGroup, "exclude").getOrElse(false)
- val include = parse[Option[Boolean]](labelGroup, "include").getOrElse(false)
+ val exclude = parseOption[Boolean](labelGroup, "exclude").getOrElse(false)
+ val include = parseOption[Boolean](labelGroup, "include").getOrElse(false)
val hasFilter = extractHas(label, labelGroup)
val labelWithDir = LabelWithDirection(label.id.get, direction)
val indexNameOpt = (labelGroup \ "index").asOpt[String]
@@ -392,7 +393,7 @@
}
val threshold = (labelGroup \ "threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
// TODO: refactor this. dirty
- val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => Query.DuplicatePolicy(s))
+ val duplicate = parseOption[String](labelGroup, "duplicate").map(s => Query.DuplicatePolicy(s))
val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s)))
val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue]
@@ -430,16 +431,23 @@
}
private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = {
- (js \ key).validate[R]
- .fold(
- errors => {
- val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].map(x => x \ "msg")
- val e = Json.obj("args" -> key, "error" -> msg)
- throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
- },
- r => {
- r
- })
+ (js \ key).validate[R] match {
+ case JsError(errors) =>
+ val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
+ val e = Json.obj("args" -> key, "error" -> msg)
+ throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
+ case JsSuccess(result, _) => result
+ }
+ }
+
+ private def parseOption[R](js: JsValue, key: String)(implicit read: Reads[R]): Option[R] = {
+ (js \ key).validateOpt[R] match {
+ case JsError(errors) =>
+ val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
+ val e = Json.obj("args" -> key, "error" -> msg)
+ throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
+ case JsSuccess(result, _) => result
+ }
}
def toJsValues(jsValue: JsValue): List[JsValue] = {
@@ -478,7 +486,7 @@
val label = parse[String](jsValue, "label")
val timestamp = parse[Long](jsValue, "timestamp")
- val direction = parse[Option[String]](jsValue, "direction").getOrElse("")
+ val direction = parseOption[String](jsValue, "direction").getOrElse("")
val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
for {
srcId <- srcIds
@@ -494,7 +502,7 @@
def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): Vertex = {
val id = parse[JsValue](jsValue, "id")
- val ts = parse[Option[Long]](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
+ val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis())
val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get
val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
@@ -511,7 +519,7 @@
Prop(propName, defaultValue, dataType)
}
- def toPropsElements(jsValue: JsValue): Seq[Prop] = for {
+ def toPropsElements(jsValue: JsLookupResult): Seq[Prop] = for {
jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil)
} yield {
val propName = (jsObj \ "name").as[String]
@@ -523,7 +531,7 @@
Prop(propName, defaultValue, dataType)
}
- def toIndicesElements(jsValue: JsValue): Seq[Index] = for {
+ def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = for {
jsObj <- jsValue.as[Seq[JsValue]]
indexName = (jsObj \ "name").as[String]
propNames = (jsObj \ "propNames").as[Seq[String]]
diff --git a/s2core/src/test/resources/log4j.properties b/s2core/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c13e516
--- /dev/null
+++ b/s2core/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=${root.logger}
+root.logger=INFO,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# The noisier spark/kafka logs are suppressed
+log4j.logger.org.apache.spark=WARN,console
+log4j.logger.org.apache.kafka=WARN,console
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
index 3911e0d..fda9991 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
@@ -223,7 +223,7 @@
for ((key, expectedVal) <- expected) {
propsLs.last.as[JsObject].keys.contains(key) should be(true)
- (propsLs.last \ key).toString should be(expectedVal)
+ (propsLs.last \ key).get.toString should be(expectedVal)
}
}
}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
index c0071fa..9c52b32 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -54,16 +54,16 @@
""")
var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index
- (edges \ "size").toString should be("1")
+ (edges \ "size").get.toString should be("1")
edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 2000)) // test interval on timestamp index
- (edges \ "size").toString should be("2")
+ (edges \ "size").get.toString should be("2")
edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 11)) // test interval on weight index
- (edges \ "size").toString should be("1")
+ (edges \ "size").get.toString should be("1")
edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 20)) // test interval on weight index
- (edges \ "size").toString should be("2")
+ (edges \ "size").get.toString should be("2")
}
test("get edge with where condition") {
@@ -604,7 +604,7 @@
logger.debug(Json.prettyPrint(rs))
val results = (rs \ "results").as[List[JsValue]]
results.size should be(1)
- (results(0) \ "to").toString should be("555")
+ (results(0) \ "to").get.toString should be("555")
}
diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/PostProcessSpec.scala
similarity index 94%
rename from s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/PostProcessSpec.scala
index adc3b02..cd809f6 100644
--- a/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/PostProcessSpec.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,13 +17,12 @@
* under the License.
*/
-package org.apache.s2graph.rest.play.controllers
+package org.apache.s2graph.core
-import org.apache.s2graph.core.{OrderingUtil, SeqMultiOrdering}
+import org.specs2.mutable.Specification
import play.api.libs.json.{JsNumber, JsString, JsValue}
-import play.api.test.PlaySpecification
-class PostProcessSpec extends PlaySpecification {
+class PostProcessSpec extends Specification {
import OrderingUtil._
"test order by json" >> {
diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala
similarity index 96%
rename from s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala
index 240421c..9e220cf 100644
--- a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.rest.play.benchmark
+package org.apache.s2graph.core.benchmark
import com.typesafe.config.{ConfigFactory, Config}
import org.apache.s2graph.core.{Management, Graph}
diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
similarity index 96%
rename from s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
index 737b828..3cb216c 100644
--- a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
@@ -17,17 +17,16 @@
* under the License.
*/
-package org.apache.s2graph.rest.play.benchmark
+package org.apache.s2graph.core.benchmark
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.GraphUtil
import org.apache.s2graph.core.types.{HBaseType, InnerVal, SourceVertexId}
-import play.api.test.PlaySpecification
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
-class GraphUtilSpec extends BenchmarkCommon with PlaySpecification {
+class GraphUtilSpec extends BenchmarkCommon {
def between(bytes: Array[Byte], startKey: Array[Byte], endKey: Array[Byte]): Boolean =
Bytes.compareTo(startKey, bytes) <= 0 && Bytes.compareTo(endKey, bytes) >= 0
diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
similarity index 97%
rename from s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
index 5733f19..8ba9ea2 100644
--- a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.rest.play.benchmark
+package org.apache.s2graph.core.benchmark
import play.api.libs.json.JsNumber
import play.libs.Json
diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/OrderingUtilBenchmarkSpec.scala
similarity index 98%
rename from s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/benchmark/OrderingUtilBenchmarkSpec.scala
index fb80dc4..3cb0543 100644
--- a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/OrderingUtilBenchmarkSpec.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.rest.play.benchmark
+package org.apache.s2graph.core.benchmark
import org.apache.s2graph.core.OrderingUtil._
import org.apache.s2graph.core.{OrderingUtil, SeqMultiOrdering}
diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
similarity index 93%
rename from s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
index 07e0b13..a8777fb 100644
--- a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
@@ -17,14 +17,12 @@
* under the License.
*/
-package org.apache.s2graph.rest.play.benchmark
-
-import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
+package org.apache.s2graph.core.benchmark
import scala.annotation.tailrec
import scala.util.Random
-class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification {
+class SamplingBenchmarkSpec extends BenchmarkCommon {
"sample" should {
"sample benchmark" in {
diff --git a/s2counter_core/build.sbt b/s2counter_core/build.sbt
index 9a75faf..b248b43 100644
--- a/s2counter_core/build.sbt
+++ b/s2counter_core/build.sbt
@@ -17,6 +17,8 @@
* under the License.
*/
+import Common._
+
name := "s2counter-core"
scalacOptions ++= Seq("-feature", "-deprecation", "-language:existentials")
@@ -24,24 +26,10 @@
scalacOptions in Test ++= Seq("-Yrangepos")
libraryDependencies ++= Seq(
- "ch.qos.logback" % "logback-classic" % "1.1.2",
- "com.typesafe" % "config" % "1.2.1",
- "com.typesafe.play" %% "play-json" % Common.playVersion,
- "com.typesafe.play" %% "play-ws" % Common.playVersion,
- "com.typesafe.akka" %% "akka-actor" % "2.3.4",
"com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
- "org.apache.hbase" % "hbase-client" % Common.hbaseVersion,
- "org.apache.hbase" % "hbase-common" % Common.hbaseVersion,
- "org.apache.hbase" % "hbase-hadoop-compat" % Common.hbaseVersion,
- "org.apache.hbase" % "hbase-hadoop2-compat" % Common.hbaseVersion,
- "org.apache.hadoop" % "hadoop-common" % Common.hadoopVersion,
- "org.apache.hadoop" % "hadoop-hdfs" % Common.hadoopVersion,
+ "com.typesafe.play" %% "play-ws" % playVersion excludeLogging(),
+ "org.apache.hadoop" % "hadoop-common" % hadoopVersion excludeLogging,
+ "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion excludeLogging,
"redis.clients" % "jedis" % "2.6.0",
- "org.apache.kafka" % "kafka-clients" % "0.8.2.0",
- "com.h2database" % "h2" % "1.4.192",
- "org.scalikejdbc" %% "scalikejdbc" % "2.1.+",
- "org.specs2" %% "specs2-core" % "3.6" % "test",
- "org.scalatest" %% "scalatest" % "2.2.1" % "test"
-).map { moduleId =>
- moduleId.exclude("org.slf4j", "slf4j-log4j12")
-}
+ "org.specs2" %% "specs2-core" % specs2Version % "test"
+)
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
index 3a196ef..caddf8d 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
@@ -19,6 +19,8 @@
package org.apache.s2graph.counter.core.v2
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import org.apache.http.HttpStatus
import org.apache.s2graph.core.mysqls.Label
@@ -28,6 +30,7 @@
import org.apache.s2graph.counter.core._
import org.apache.s2graph.counter.models.Counter
import org.apache.s2graph.counter.util.CartesianProduct
+import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.slf4j.LoggerFactory
import play.api.libs.json._
import scala.concurrent.duration._
@@ -38,7 +41,8 @@
implicit val respGraphFormat = Json.format[RespGraph]
// using play-ws without play app
- private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder()
+ implicit val materializer = ActorMaterializer.create(ActorSystem(getClass.getSimpleName))
+ private val builder = new DefaultAsyncHttpClientConfig.Builder()
private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
}
@@ -209,10 +213,10 @@
}
private def resultToExactKeyValues(policy: Counter, result: JsValue): (ExactKeyTrait, (core.ExactQualifier, Long)) = {
- val from = result \ "from" match {
+ val from = (result \ "from").get match {
case s: JsString => s.as[String]
case n: JsNumber => n.as[Long].toString
- case x: JsValue => throw new RuntimeException(s"$x's type must be string or number")
+ case x => throw new RuntimeException(s"$x's type must be string or number")
}
val dimension = (result \ "to").as[String]
val props = result \ "props"
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala
index 29a509b..329f3d0 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala
@@ -19,9 +19,13 @@
package org.apache.s2graph.counter.core.v2
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import org.apache.http.HttpStatus
import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core.v2.ExactStorageGraph._
+import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.slf4j.LoggerFactory
import play.api.libs.json.{JsObject, JsValue, Json}
import scala.concurrent.Await
@@ -29,7 +33,8 @@
class GraphOperation(config: Config) {
// using play-ws without play app
- private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder()
+ implicit val materializer = ActorMaterializer.create(ActorSystem(getClass.getSimpleName))
+ private val builder = new DefaultAsyncHttpClientConfig.Builder()
private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
private val s2config = new S2CounterConfig(config)
val s2graphUrl = s2config.GRAPH_URL
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
index 55800f2..b9ca9a4 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
@@ -19,15 +19,19 @@
package org.apache.s2graph.counter.core.v2
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import org.apache.commons.httpclient.HttpStatus
import org.apache.s2graph.core.GraphUtil
import org.apache.s2graph.core.mysqls.Label
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
+import org.apache.s2graph.counter.core.v2.ExactStorageGraph._
import org.apache.s2graph.counter.core.{RankingResult, RankingKey, RankingStorage}
import org.apache.s2graph.counter.models.{Counter, CounterModel}
import org.apache.s2graph.counter.util.{CollectionCacheConfig, CollectionCache}
+import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.slf4j.LoggerFactory
import play.api.libs.json.{JsObject, JsString, JsValue, Json}
import scala.concurrent.duration._
@@ -36,7 +40,8 @@
object RankingStorageGraph {
// using play-ws without play app
- private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder()
+ implicit val materializer = ActorMaterializer.create(ActorSystem(getClass.getSimpleName))
+ private val builder = new DefaultAsyncHttpClientConfig.Builder()
private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
}
diff --git a/s2counter_core/src/test/resources/application.conf b/s2counter_core/src/test/resources/application.conf
index a1cd11b..b744a71 100644
--- a/s2counter_core/src/test/resources/application.conf
+++ b/s2counter_core/src/test/resources/application.conf
@@ -22,10 +22,10 @@
host=localhost
# DB
-db.default.driver=com.mysql.jdbc.Driver
-db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
-db.default.user=graph
-db.default.password=graph
+db.default.driver = "org.h2.Driver"
+db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
+db.default.user = "sa"
+db.default.password = "sa"
# Kafka
kafka {
diff --git a/s2counter_loader/build.sbt b/s2counter_loader/build.sbt
index 8de6281..d0f1f41 100644
--- a/s2counter_loader/build.sbt
+++ b/s2counter_loader/build.sbt
@@ -28,7 +28,6 @@
"org.apache.spark" %% "spark-core" % Common.sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % Common.sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-kafka" % Common.sparkVersion,
- "com.typesafe.play" %% "play-ws" % Common.playVersion,
"javax.servlet" % "javax.servlet-api" % "3.0.1" % "test",
"org.scalatest" %% "scalatest" % "2.2.4" % "test"
)
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
index 7dcf48a..247cd07 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
@@ -69,7 +69,7 @@
} yield {
val jsValue = variable match {
case "_from" => JsString(srcId)
- case s => dimension \ s
+ case s => (dimension \ s).get
}
s"[[$variable]]" -> jsValue
}
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala
index e3bb884..6eaa6e2 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala
@@ -30,10 +30,10 @@
}
lazy val value = {
- property \ "value" match {
- case JsNumber(n) => n.longValue()
- case JsString(s) => s.toLong
- case _: JsUndefined => 1L
+ (property \ "value").toOption match {
+ case Some(JsNumber(n)) => n.longValue()
+ case Some(JsString(s)) => s.toLong
+ case None => 1L
case _ => throw new Exception("wrong type")
}
}
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
index 05423b1..7b272be 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
@@ -72,7 +72,7 @@
Try {
for {
k <- keys
- jsValue = dimension \ k
+ jsValue <- (dimension \ k).toOption
} yield {
jsValue match {
case JsNumber(n) => n.toString()
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
index a1b9903..f941224 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
@@ -19,11 +19,14 @@
package org.apache.s2graph.counter.loader.core
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
import org.apache.commons.httpclient.HttpStatus
import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
import org.apache.s2graph.counter.loader.config.StreamingConfig
import org.apache.s2graph.counter.models.Counter
import org.apache.s2graph.counter.util.{RetryAsync, CollectionCache, CollectionCacheConfig}
+import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.slf4j.LoggerFactory
import play.api.libs.json._
import scala.annotation.tailrec
@@ -34,7 +37,8 @@
object DimensionProps {
// using play-ws without play app
- private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder()
+ implicit val materializer = ActorMaterializer.create(ActorSystem(getClass.getSimpleName))
+ private val builder = new DefaultAsyncHttpClientConfig.Builder()
private val client = new play.api.libs.ws.ning.NingWSClient(builder.build)
private val log = LoggerFactory.getLogger(this.getClass)
diff --git a/s2counter_loader/src/test/resources/application.conf b/s2counter_loader/src/test/resources/application.conf
index 204d1c7..b74fd94 100644
--- a/s2counter_loader/src/test/resources/application.conf
+++ b/s2counter_loader/src/test/resources/application.conf
@@ -22,10 +22,10 @@
host=localhost
# DB
-db.default.driver=com.mysql.jdbc.Driver
-db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
-db.default.user=graph
-db.default.password=graph
+db.default.driver = "org.h2.Driver"
+db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
+db.default.user = "sa"
+db.default.password = "sa"
# Kafka
kafka {
diff --git a/s2rest_netty/build.sbt b/s2rest_netty/build.sbt
index b7c5484..975ced5 100644
--- a/s2rest_netty/build.sbt
+++ b/s2rest_netty/build.sbt
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+import Common._
name := "s2rest_netty"
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index 4170eec..707b98c 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -19,9 +19,11 @@
package org.apache.s2graph.rest.play.controllers
+import akka.util.ByteString
import org.apache.s2graph.core.GraphExceptions.BadQueryException
import org.apache.s2graph.core.PostProcess
import org.apache.s2graph.core.utils.logger
+import play.api.http.HttpEntity
import play.api.libs.iteratee.Enumerator
import play.api.libs.json.{JsString, JsValue}
import play.api.mvc._
@@ -69,8 +71,7 @@
} else {
Result(
header = ResponseHeader(OK),
- body = Enumerator(json.toString.getBytes()),
- connection = HttpConnection.Close
+ body = HttpEntity.Strict(ByteString(json.toString.getBytes()), Some(applicationJsonHeader))
).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: headers: _*)
}
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 478df99..ec8324c 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -40,10 +40,12 @@
private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser
+
private def jsToStr(js: JsValue): String = js match {
case JsString(s) => s
- case _ => js.toString()
+ case obj => obj.toString()
}
+ private def jsToStr(js: JsLookupResult): String = js.toOption.map(jsToStr).getOrElse("undefined")
def toTsv(jsValue: JsValue, op: String): String = {
val ts = jsToStr(jsValue \ "timestamp")
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala
index 3245bb6..aa507d9 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala
@@ -19,10 +19,12 @@
package org.apache.s2graph.rest.play.controllers
+import akka.util.ByteString
import org.apache.s2graph.core.utils.logger
import play.api.Play
import play.api.libs.iteratee.Iteratee
import play.api.libs.json.{JsValue, Json}
+import play.api.libs.streams.Streams
import play.api.mvc._
import scala.concurrent.Future
@@ -50,9 +52,11 @@
import play.api.libs.iteratee.Execution.Implicits.trampoline
import play.api.libs.iteratee.Traversable
- Traversable.takeUpTo[Array[Byte]](maxLength)
- .transform(Iteratee.consume[Array[Byte]]().map(c => new String(c, "UTF-8")))
+ val iteratee = Traversable.takeUpTo[ByteString](maxLength)
+ .transform(Iteratee.consume[ByteString]().map(_.utf8String))
.flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
+
+ Streams.iterateeToAccumulator(iteratee)
}
def json(maxLength: Int): BodyParser[JsValue] = when(
@@ -76,14 +80,14 @@
import scala.util.control.Exception._
- val bodyParser: Iteratee[Array[Byte], Either[Result, Either[Future[Result], A]]] =
- Traversable.takeUpTo[Array[Byte]](maxLength).transform(
- Iteratee.consume[Array[Byte]]().map { bytes =>
+ val bodyParser: Iteratee[ByteString, Either[Result, Either[Future[Result], A]]] =
+ Traversable.takeUpTo[ByteString](maxLength).transform(
+ Iteratee.consume[ByteString]().map { bytes =>
allCatch[A].either {
- parser(request, bytes)
+ parser(request, bytes.toByteBuffer.array())
}.left.map {
case NonFatal(e) =>
- val txt = new String(bytes)
+ val txt = bytes.utf8String
logger.error(s"$errorMessage: $txt", e)
createBadResult(s"$errorMessage: $e")(request)
case t => throw t
@@ -91,7 +95,7 @@
}
).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
- bodyParser.mapM {
+ Streams.iterateeToAccumulator(bodyParser).mapFuture {
case Left(tooLarge) => Future.successful(Left(tooLarge))
case Right(Left(badResult)) => badResult.map(Left.apply)
case Right(Right(body)) => Future.successful(Right(body))
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/package.scala b/s2rest_play/app/org/apache/s2graph/rest/play/package.scala
new file mode 100644
index 0000000..be4175a
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/package.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.rest
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+
+package object play {
+ implicit val actorSystem = ActorSystem("s2graph")
+ implicit val materializer = ActorMaterializer.create(actorSystem)
+}
diff --git a/s2rest_play/build.sbt b/s2rest_play/build.sbt
index 6edb8e8..7583ad9 100644
--- a/s2rest_play/build.sbt
+++ b/s2rest_play/build.sbt
@@ -16,17 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+import Common._
name := "s2rest_play"
scalacOptions in Test ++= Seq("-Yrangepos")
-libraryDependencies ++= Seq(
- ws,
- filters,
- "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
- "com.github.danielwegener" % "logback-kafka-appender" % "0.0.3"
+libraryDependencies := (libraryDependencies.value ++ Seq(ws, filters, specs2 % Test)).map(_.excludeLogging()) ++ Seq(
+ "com.google.guava" % "guava" % "12.0.1" force() // use this old version of guava to avoid incompatibility
+ //"org.specs2" %% "specs2-core" % specs2Version % "test"
)
-enablePlugins(JavaServerAppPackaging)
+routesGenerator := StaticRoutesGenerator
+enablePlugins(JavaServerAppPackaging)
diff --git a/spark/build.sbt b/spark/build.sbt
index f5412d8..14a673e 100644
--- a/spark/build.sbt
+++ b/spark/build.sbt
@@ -17,16 +17,18 @@
* under the License.
*/
+import Common._
+
name := "s2spark"
scalacOptions ++= Seq("-deprecation", "-feature")
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
- "org.apache.spark" %% "spark-core" % Common.sparkVersion % "provided",
- "org.apache.spark" %% "spark-streaming" % Common.sparkVersion % "provided",
- "org.apache.spark" %% "spark-streaming-kafka" % Common.sparkVersion,
- "com.typesafe.play" %% "play-json" % Common.playVersion,
- "org.specs2" %% "specs2-core" % "3.6" % "test",
+ "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
+ "com.typesafe.play" %% "play-json" % playVersion,
+ "org.specs2" %% "specs2-core" % specs2Version % "test",
"org.scalatest" %% "scalatest" % "2.2.1" % "test"
)