[S2GRAPH-160]: Remove warning while package.

JIRA:
    [S2GRAPH-160] https://issues.apache.org/jira/browse/S2GRAPH-160

Pull Request:
    Closes #120

Author
    DO YUNG YOON <steamshon@apache.org>
diff --git a/CHANGES b/CHANGES
index 858df20..b26f868 100644
--- a/CHANGES
+++ b/CHANGES
@@ -30,6 +30,7 @@
     * [S2GRAPH-153] - Implement IndexProvider(for Mixed IndexType) class
     * [S2GRAPH-157] - Check proper apache license header on all files for release 0.2.0
     * [S2GRAPH-158] - Bug fix on String type InnerVal encoding to use UTF-8 charsetName.
+    * [S2GRAPH-160] - Remove warning while package
     * [S2GRAPH-161] - Update CHANGES file with correct release version
 
 ** Bug
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index 48976d3..eb5b1da 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -96,7 +96,7 @@
   private def toKafkaProp(config: Config) = {
     val props = new Properties()
 
-    /** all default configuration for new producer */
+    /* all default configuration for new producer */
     val brokers =
       if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
       else "localhost"
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index ea50b17..9de3d9d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -302,13 +302,12 @@
 
   def jsValueToAny(value: JsValue): Option[AnyRef] = {
     try {
-      val v = value match {
-//        case JsNull =>
-        case n: JsNumber => n.value
-        case s: JsString => TemplateHelper.replaceVariable(System.currentTimeMillis(), s.value)
-        case b: JsBoolean => Boolean.box(b.value)
+      value match {
+        case n: JsNumber => Option(n.value)
+        case s: JsString => Option(TemplateHelper.replaceVariable(System.currentTimeMillis(), s.value))
+        case b: JsBoolean => Option(Boolean.box(b.value))
+        case _ => None
       }
-      Option(v)
     } catch {
       case e: Exception =>
         logger.error(s"jsValueToAny: $value", e)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 6119045..a9741d2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -299,7 +299,7 @@
 
     Model withTx { implicit session =>
       val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false)
-      /** create hbase table for service */
+      /* create hbase table for service */
       graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
       service
     }
@@ -380,14 +380,14 @@
     Model withTx { implicit session =>
       if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
 
-      /** create all models */
+      /* create all models */
       val newLabel = Label.insertAll(label,
         srcServiceName, srcColumnName, srcColumnType,
         tgtServiceName, tgtColumnName, tgtColumnType,
         isDirected, serviceName, indices, props, consistencyLevel,
         hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
 
-      /** create hbase table */
+      /* create hbase table */
       val storage = graph.getStorage(newLabel)
       val service = newLabel.service
       storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index a7f485c..3916f39 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -290,7 +290,7 @@
         (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
         aggregated = agg(groupByKey) if aggregated.nonEmpty
         sorted = orderBy(globalQueryOption, aggregated)
-      } yield groupByKey -> (scoreSum, sorted)
+      } yield (groupByKey, (scoreSum, sorted))
       (Nil, grouped)
     } else {
       val ordered = orderBy(globalQueryOption, ls)
@@ -319,7 +319,7 @@
       (key, (scoreSum, values)) <- baseStepResult.grouped
       (out, in) = values.partition(v => filterOutSet.contains(v.filterOutValues))
       newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty
-    } yield key -> (newScoreSum, in)
+    } yield (key, (newScoreSum, in))
 
     StepResult(edgeWithScores = filteredResults, grouped = grouped, baseStepResult.degreeEdges, cursors = baseStepResult.cursors, failCount = baseStepResult.failCount + filterOutStepResult.failCount)
   }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index fa9ff62..7165579 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -142,7 +142,7 @@
   override def toString(): String = {
     Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction,
       "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
-      "statusCode" -> statusCode, "lockTs" -> lockTs).toString
+      "statusCode" -> statusCode, "lockTs" -> lockTs).toString()
   }
 }
 
@@ -206,7 +206,7 @@
     propsWithTs.get(meta.name) match {
       case null =>
 
-        /**
+        /*
           * TODO: agly hack
           * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
           */
@@ -297,7 +297,7 @@
   override def toString(): String = {
     Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir,
       "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString
-    ).toString
+    ).toString()
   }
 }
 
@@ -925,12 +925,12 @@
       for {
         (requestEdge, func) <- requestWithFuncs
       } {
-        val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)
+        val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer))
         prevPropsWithTs = _newPropsWithTs
         //        logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
       }
       val requestTs = requestEdge.ts
-      /** version should be monotoniously increasing so our RPC mutation should be applied safely */
+      /* version should be monotoniously increasing so our RPC mutation should be applied safely */
       val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
       val maxTs = prevPropsWithTs.map(_._2.ts).max
       val newTs = if (maxTs > requestTs) maxTs else requestTs
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 890dc4d..d1eda5e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -22,6 +22,7 @@
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.{Executors, TimeUnit}
+import java.util.function.Consumer
 
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
@@ -239,8 +240,8 @@
     tsVal
   }
 
-  def processDuplicates[T](queryParam: QueryParam,
-                           duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = {
+  def processDuplicates[R](queryParam: QueryParam,
+                           duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = {
 
     if (queryParam.label.consistencyLevel != "strong") {
       //TODO:
@@ -309,7 +310,7 @@
             edgeWithScore
           }
 
-          /** process step group by */
+          /* process step group by */
           val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
           StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
 
@@ -319,25 +320,25 @@
             val score = edgeWithScore.score
             val label = edgeWithScore.label
 
-            /** Select */
+            /* Select */
             val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
 
 //            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
             val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
 
             val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
-            /** OrderBy */
+            /* OrderBy */
             val orderByValues =
              if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
               else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
 
-            /** StepGroupBy */
+            /* StepGroupBy */
             val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
 
-            /** GroupBy */
+            /* GroupBy */
             val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
 
-            /** FilterOut */
+            /* FilterOut */
             val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
 
             newEdgeWithScore.copy(orderByValues = orderByValues,
@@ -346,13 +347,13 @@
               filterOutValues = filterOutValues)
           }
 
-          /** process step group by */
+          /* process step group by */
           val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
 
-          /** process ordered list */
+          /* process ordered list */
           val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
 
-          /** process grouped list */
+          /* process grouped list */
           val grouped =
           if (queryOption.groupBy.keys.isEmpty) Nil
           else {
@@ -365,13 +366,13 @@
 
               val newScoreSum = scoreSum
 
-              /**
+              /*
                 * watch out here. by calling toString on Any, we lose type information which will be used
                 * later for toJson.
                 */
               if (merged.nonEmpty) {
                 val newKey = merged.head.groupByValues
-                agg += (newKey -> (newScoreSum, merged))
+                agg += ((newKey, (newScoreSum, merged)))
               }
             }
             agg.toSeq.sortBy(_._2._1 * -1)
@@ -399,7 +400,7 @@
         val score = edgeWithScore.score
         val label = edgeWithScore.label
 
-        /** Select */
+        /* Select */
         val mergedPropsWithTs =
           if (queryOption.selectColumns.isEmpty) {
             edge.propertyValuesInner()
@@ -450,17 +451,17 @@
     }
   }
 
-  private def buildResult[T](query: Query,
+  private def buildResult[R](query: Query,
                              stepIdx: Int,
                              stepResultLs: Seq[(QueryRequest, StepResult)],
                              parentEdges: Map[VertexId, Seq[EdgeWithScore]])
-                            (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T)
-                            (implicit ev: WithScore[T]): ListBuffer[T] = {
+                            (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
+                            (implicit ev: WithScore[R]): ListBuffer[R] = {
     import scala.collection._
 
-    val results = ListBuffer.empty[T]
-    val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty
-    val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty
+    val results = ListBuffer.empty[R]
+    val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
+    val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
     val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
     val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
 
@@ -488,7 +489,7 @@
         val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
         //        params += (hashKey -> queryParam) //
 
-        /** check if this edge should be exlcuded. */
+        /* check if this edge should be exlcuded. */
         if (shouldBeExcluded) {
           edgesToExclude.add(filterHashKey)
         } else {
@@ -500,7 +501,7 @@
           sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
           duplicates.get(hashKey) match {
             case None =>
-              val newLs = ListBuffer.empty[(FilterHashKey, T)]
+              val newLs = ListBuffer.empty[(FilterHashKey, R)]
               newLs += (filterHashKey -> newEdgeWithScore)
               duplicates += (hashKey -> newLs) //
             case Some(old) =>
@@ -547,7 +548,7 @@
   new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD)
 ))
 @Graph.OptOuts(value = Array(
-  /** Process */
+  /* Process */
   /* branch: passed all. */
 //  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest$Traversals", method = "*", reason = "no"),
 // passed: all
@@ -769,7 +770,7 @@
 //  new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest", method = "*", reason = "no"),
 //  passed: all
 
-  /** Structure */
+  /* Structure */
   new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateIdEquality", reason="reference equals on EdgeId is not supported."),
   new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateEquality", reason="reference equals on EdgeId is not supported."),
   // passed: all, failed: none
@@ -967,7 +968,7 @@
   def flushStorage(): Unit = {
     storagePool.foreach { case (_, storage) =>
 
-      /** flush is blocking */
+      /* flush is blocking */
       storage.flush()
     }
   }
@@ -1149,7 +1150,7 @@
 
 
   def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
-    /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+    /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
       * so use empty cacheKey.
       * */
     val queryParam = QueryParam(labelName = edge.innerLabel.label,
@@ -1194,7 +1195,7 @@
 
     val requestTs = ts
     val vertices = srcVertices
-    /** create query per label */
+    /* create query per label */
     val queries = for {
       label <- labels
     } yield {
@@ -1252,7 +1253,7 @@
       val ret = label.schemaVersion match {
         case HBaseType.VERSION3 | HBaseType.VERSION4 =>
           if (label.consistencyLevel == "strong") {
-            /**
+            /*
               * read: snapshotEdge on queryResult = O(N)
               * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
               */
@@ -1262,7 +1263,7 @@
           }
         case _ =>
 
-          /**
+          /*
             * read: x
             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
             */
@@ -1369,7 +1370,7 @@
         val edges = edgeGroup.map(_._1)
         val idxs = edgeGroup.map(_._2)
 
-        /** multiple edges with weak consistency level will be processed as batch */
+        /* multiple edges with weak consistency level will be processed as batch */
         val mutations = edges.flatMap { edge =>
           val (_, edgeUpdate) =
             if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
@@ -1450,7 +1451,7 @@
     val parts = GraphUtil.split(s)
     val logType = parts(2)
     val element = if (logType == "edge" | logType == "e") {
-      /** current only edge is considered to be bulk loaded */
+      /* current only edge is considered to be bulk loaded */
       labelMapping.get(parts(5)) match {
         case None =>
         case Some(toReplace) =>
@@ -1754,7 +1755,6 @@
       case s2Edge: S2Edge => s2Edge.id().asInstanceOf[EdgeId]
       case id: EdgeId => id
       case s: String => EdgeId.fromString(s)
-      case s: java.lang.String => EdgeId.fromString(s)
     }
     val edgesToFetch = for {
       id <- s2EdgeIds
@@ -1937,8 +1937,7 @@
   override def toString(): String = "[s2graph]"
 
   override def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
-    builder.graph(this).registry(new S2GraphIoRegistry).create().asInstanceOf[I]
-
+    builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I]
   }
 
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index 7f13711..a9c98c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -119,7 +119,7 @@
       true, S2Graph.DefaultServiceName, Nil, Seq(Prop("weight", "0.0", "double")), "strong", None, None)
   }
 
-  def cleanupDefaultSchema: Unit = {
+  def cleanupDefaultSchema(): Unit = {
     val columnNames = Set(S2Graph.DefaultColumnName, "person", "software", "product", "dog",
       "animal", "song", "artist", "STEPHEN")
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 6b0c0eb..e86c17f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -49,7 +49,6 @@
     case _: Boolean => true
     case _: Short => true
     case _: Byte => true
-    case _: String => true
     case _: BigDecimal => true
     case _ => false
   }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
index 43e5db8..e71bbce 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
@@ -51,7 +51,7 @@
 
   def toRange(str: String): Option[(Int, Int)] = {
     val range = str.split(rangeDelimiter)
-    if (range.length == 2) Option(range.head.toInt, range.last.toInt)
+    if (range.length == 2) Option((range.head.toInt, range.last.toInt))
     else None
   }
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 415a64e..97fd704 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -189,14 +189,14 @@
         val tgtServiceId = tgtService.id.get
         val serviceId = service.id.get
 
-        /** insert serviceColumn */
+        /* insert serviceColumn */
         val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType))
         val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType))
 
         if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
         if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
 
-        /** create label */
+        /* create label */
         Label.findByName(labelName, useCache = false).getOrElse {
 
           val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType,
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index a2f5c47..4bc9376 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -259,6 +259,7 @@
     case arr: JsArray =>
       val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
       GroupBy(keys)
+    case _ => GroupBy.Empty
   }.getOrElse(GroupBy.Empty)
 
   def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[S2Vertex] = {
@@ -514,7 +515,7 @@
   private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = {
     (js \ key).validate[R] match {
       case JsError(errors) =>
-        val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
+        val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
         val e = Json.obj("args" -> key, "error" -> msg)
         throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
       case JsSuccess(result, _) => result
@@ -524,7 +525,7 @@
   private def parseOption[R](js: JsValue, key: String)(implicit read: Reads[R]): Option[R] = {
     (js \ key).validateOpt[R] match {
       case JsError(errors) =>
-        val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
+        val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
         val e = Json.obj("args" -> key, "error" -> msg)
         throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
       case JsSuccess(result, _) => result
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index bde7f3a..db9a9da 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -43,7 +43,7 @@
   def toLogString = {
     Map("table" -> Bytes.toString(table), "row" -> row.toList, "cf" -> Bytes.toString(cf),
       "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp,
-      "operation" -> operation, "durability" -> durability).toString
+      "operation" -> operation, "durability" -> durability).toString()
   }
   override def toString(): String = toLogString
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index a8dec7e..57d4872 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -398,7 +398,7 @@
       future recoverWith {
         case FetchTimeoutException(retryEdge) =>
           logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
-          /** fetch failed. re-fetch should be done */
+          /* fetch failed. re-fetch should be done */
           fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
             retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
           }
@@ -413,14 +413,14 @@
           }
           logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
 
-          /** retry logic */
+          /* retry logic */
           val promise = Promise[Boolean]
           val backOff = exponentialBackOff(tryNum)
           scheduledThreadPool.schedule(new Runnable {
             override def run(): Unit = {
               val future = if (failedStatusCode == 0) {
                 // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
-                /** fetch failed. re-fetch should be done */
+                /* fetch failed. re-fetch should be done */
                 fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
                   retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
                 }
@@ -453,7 +453,7 @@
       case 0 =>
         fetchedSnapshotEdgeOpt match {
           case None =>
-          /**
+          /*
            * no one has never mutated this SN.
            * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
            * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
@@ -475,7 +475,7 @@
           case Some(snapshotEdge) =>
             snapshotEdge.pendingEdgeOpt match {
               case None =>
-                /**
+                /*
                  * others finished commit on this SN. but there is no contention.
                  * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
                  * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
@@ -497,7 +497,7 @@
               case Some(pendingEdge) =>
                 val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
                 if (isLockExpired) {
-                  /**
+                  /*
                    * if pendingEdge.ts == snapshotEdge.ts =>
                    *    (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
                    * else =>
@@ -519,7 +519,7 @@
 
                   commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
                 } else {
-                  /**
+                  /*
                    * others finished commit on this SN and there is currently contention.
                    * this can't be proceed so retry from re-fetch.
                    * throw EX
@@ -532,11 +532,11 @@
         }
       case _ =>
 
-        /**
+        /*
          * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
          */
 
-        /**
+        /*
          * this succeed to lock this SN. keep doing on commit process.
          * if SN.isEmpty =>
          * no one never succed to commit on this SN.
@@ -807,7 +807,7 @@
               buildIncrementsAsync(indexEdge, -1L)
           }
 
-          /** reverted direction */
+          /* reverted direction */
           val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
           val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
             indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
@@ -972,7 +972,7 @@
 
     tgtVertexIdOpt match {
       case Some(tgtVertexId) => // _to is given.
-        /** we use toSnapshotEdge so dont need to swap src, tgt */
+        /* we use toSnapshotEdge so dont need to swap src, tgt */
         val src = srcVertex.innerId
         val tgt = tgtVertexId
         val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), TargetVertexId(tgtColumn, tgt))
@@ -989,7 +989,7 @@
   }
 
   protected def fetchSnapshotEdgeInner(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
-    /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+    /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
       * so use empty cacheKey.
       * */
     val queryParam = QueryParam(labelName = edge.innerLabel.label,
@@ -1042,21 +1042,21 @@
   def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
     (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match {
       case (true, true) =>
-        /** when there is no need to update. shouldUpdate == false */
+        /* when there is no need to update. shouldUpdate == false */
         Nil -> Nil
 
       case (true, false) =>
-        /** no edges to delete but there is new edges to insert so increase degree by 1 */
+        /* no edges to delete but there is new edges to insert so increase degree by 1 */
         val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree)
         buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_))
 
       case (false, true) =>
-        /** no edges to insert but there is old edges to delete so decrease degree by 1 */
+        /* no edges to insert but there is old edges to delete so decrease degree by 1 */
         val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree)
         buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1))
 
       case (false, false) =>
-        /** update on existing edges so no change on degree */
+        /* update on existing edges so no change on degree */
         Nil -> Nil
     }
   }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index e4d85cf..4fb2240 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -220,7 +220,7 @@
       val _client = client(withWait)
       val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
 
-      /** Asynchbase IncrementRequest does not implement HasQualifiers */
+      /* Asynchbase IncrementRequest does not implement HasQualifiers */
       val incrementsFutures = increments.map { kv =>
         val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
         val defer = _client.atomicIncrement(inc)
@@ -231,7 +231,7 @@
         if (withWait) future else Future.successful(true)
       }
 
-      /** PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
+      /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
       val othersFutures = putAndDeletes.groupBy { kv =>
         (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
       }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
@@ -362,7 +362,7 @@
             }
             (_startKey , Bytes.add(baseKey, intervalMinBytes))
           } else {
-            /**
+             /*
               * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
               */
             val _startKey = queryParam.cursorOpt match {
@@ -449,7 +449,7 @@
 
     val queryParam = queryRequest.queryParam
     val cacheTTL = queryParam.cacheTTLInMillis
-    /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
 
     val edge = toRequestEdge(queryRequest, parentEdges)
     val request = buildRequest(queryRequest, edge)
@@ -562,7 +562,7 @@
                            compressionAlgorithm: String,
                            replicationScopeOpt: Option[Int] = None,
                            totalRegionCount: Option[Int] = None): Unit = {
-    /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
+    /* TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
     for {
       zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
     } {
@@ -789,7 +789,7 @@
   }
 
   private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
-    /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
     hbaseRpc match {
       case Left(getRequest) => getRequest.key
       case Right(ScanWithRange(scanner, offset, limit)) =>
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 6095cea..3da8267 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -111,7 +111,7 @@
              }
 
            val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
-           /** process indexProps */
+           /* process indexProps */
            val size = idxPropsRaw.length
            (0 until size).foreach { ith =>
              val meta = index.sortKeyTypesArray(ith)
@@ -125,7 +125,7 @@
              }
            }
 
-           /** process props */
+           /* process props */
            if (op == GraphUtil.operations("incrementCount")) {
              //        val countVal = Bytes.toLong(kv.value)
              val countVal = bytesToLongFunc(kv.value, 0)
@@ -139,7 +139,7 @@
              }
            }
 
-           /** process tgtVertexId */
+           /* process tgtVertexId */
            val tgtVertexId =
              if (edge.checkProperty(LabelMeta.to.name)) {
                val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 6818c1d..59db07e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -87,7 +87,7 @@
 
            val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
 
-           /** process indexProps */
+           /* process indexProps */
            val size = idxPropsRaw.length
            (0 until size).foreach { ith =>
              val meta = index.sortKeyTypesArray(ith)
@@ -101,7 +101,7 @@
              }
            }
 
-           /** process props */
+           /* process props */
            if (op == GraphUtil.operations("incrementCount")) {
              //        val countVal = Bytes.toLong(kv.value)
              val countVal = bytesToLongFunc(kv.value, 0)
@@ -114,7 +114,7 @@
                edge.propertyInner(k.name, v.value, version)
              }
            }
-           /** process tgtVertexId */
+           /* process tgtVertexId */
            val tgtVertexId =
              if (edge.checkProperty(LabelMeta.to.name)) {
                val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index a47fda7..126f193 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -46,6 +46,7 @@
 import scala.concurrent.{ExecutionContext, Future}
 import scala.io.Source
 import scala.util.{Failure, Success, Try}
+import scala.language.existentials
 
 class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] {
   val ApplicationJson = "application/json"
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index e5fc75d..ff82c44 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -83,7 +83,7 @@
     wallLogHandler.shutdown()
     QueueActor.shutdown()
 
-    /**
+    /*
      * shutdown hbase client for flush buffers.
      */
     shutdown()
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
index ecea304..8eb25fd 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -100,7 +100,7 @@
     case Failure(error) =>
       logger.error(error.getMessage, error)
       error match {
-        case JsResultException(e) => bad(JsError.toFlatJson(e))
+        case JsResultException(e) => bad(JsError.toJson(e))
         case _ => bad(error.getMessage)
       }
   }
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index aed8ced..28da7fe 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -129,7 +129,7 @@
           }.map(jsonResponse(_))
         } else {
           val rets = elementWithIdxs.map { case ((element, tsv), idx) =>
-            if (!skipElement(element.isAsync)) QueueActor.router ! (element, tsv)
+            if (!skipElement(element.isAsync)) QueueActor.router ! ((element, tsv))
             true
           }
           Future.successful(jsonResponse(Json.toJson(rets)))
@@ -251,7 +251,7 @@
 
   def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
 
-    /** logging for delete all request */
+    /* logging for delete all request */
     def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = {
       val kafkaMessages = for {
         id <- ids