[S2GRAPH-160]: Remove warning while package.
JIRA:
[S2GRAPH-160] https://issues.apache.org/jira/browse/S2GRAPH-160
Pull Request:
Closes #120
Author
DO YUNG YOON <steamshon@apache.org>
diff --git a/CHANGES b/CHANGES
index 858df20..b26f868 100644
--- a/CHANGES
+++ b/CHANGES
@@ -30,6 +30,7 @@
* [S2GRAPH-153] - Implement IndexProvider(for Mixed IndexType) class
* [S2GRAPH-157] - Check proper apache license header on all files for release 0.2.0
* [S2GRAPH-158] - Bug fix on String type InnerVal encoding to use UTF-8 charsetName.
+ * [S2GRAPH-160] - Remove warning while package
* [S2GRAPH-161] - Update CHANGES file with correct release version
** Bug
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index 48976d3..eb5b1da 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -96,7 +96,7 @@
private def toKafkaProp(config: Config) = {
val props = new Properties()
- /** all default configuration for new producer */
+ /* all default configuration for new producer */
val brokers =
if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
else "localhost"
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index ea50b17..9de3d9d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -302,13 +302,12 @@
def jsValueToAny(value: JsValue): Option[AnyRef] = {
try {
- val v = value match {
-// case JsNull =>
- case n: JsNumber => n.value
- case s: JsString => TemplateHelper.replaceVariable(System.currentTimeMillis(), s.value)
- case b: JsBoolean => Boolean.box(b.value)
+ value match {
+ case n: JsNumber => Option(n.value)
+ case s: JsString => Option(TemplateHelper.replaceVariable(System.currentTimeMillis(), s.value))
+ case b: JsBoolean => Option(Boolean.box(b.value))
+ case _ => None
}
- Option(v)
} catch {
case e: Exception =>
logger.error(s"jsValueToAny: $value", e)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 6119045..a9741d2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -299,7 +299,7 @@
Model withTx { implicit session =>
val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false)
- /** create hbase table for service */
+ /* create hbase table for service */
graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
service
}
@@ -380,14 +380,14 @@
Model withTx { implicit session =>
if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
- /** create all models */
+ /* create all models */
val newLabel = Label.insertAll(label,
srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType,
isDirected, serviceName, indices, props, consistencyLevel,
hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
- /** create hbase table */
+ /* create hbase table */
val storage = graph.getStorage(newLabel)
val service = newLabel.service
storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index a7f485c..3916f39 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -290,7 +290,7 @@
(groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
aggregated = agg(groupByKey) if aggregated.nonEmpty
sorted = orderBy(globalQueryOption, aggregated)
- } yield groupByKey -> (scoreSum, sorted)
+ } yield (groupByKey, (scoreSum, sorted))
(Nil, grouped)
} else {
val ordered = orderBy(globalQueryOption, ls)
@@ -319,7 +319,7 @@
(key, (scoreSum, values)) <- baseStepResult.grouped
(out, in) = values.partition(v => filterOutSet.contains(v.filterOutValues))
newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty
- } yield key -> (newScoreSum, in)
+ } yield (key, (newScoreSum, in))
StepResult(edgeWithScores = filteredResults, grouped = grouped, baseStepResult.degreeEdges, cursors = baseStepResult.cursors, failCount = baseStepResult.failCount + filterOutStepResult.failCount)
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index fa9ff62..7165579 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -142,7 +142,7 @@
override def toString(): String = {
Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction,
"operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
- "statusCode" -> statusCode, "lockTs" -> lockTs).toString
+ "statusCode" -> statusCode, "lockTs" -> lockTs).toString()
}
}
@@ -206,7 +206,7 @@
propsWithTs.get(meta.name) match {
case null =>
- /**
+ /*
* TODO: agly hack
* now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
*/
@@ -297,7 +297,7 @@
override def toString(): String = {
Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir,
"operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString
- ).toString
+ ).toString()
}
}
@@ -925,12 +925,12 @@
for {
(requestEdge, func) <- requestWithFuncs
} {
- val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)
+ val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer))
prevPropsWithTs = _newPropsWithTs
// logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
}
val requestTs = requestEdge.ts
- /** version should be monotoniously increasing so our RPC mutation should be applied safely */
+ /* version should be monotoniously increasing so our RPC mutation should be applied safely */
val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
val maxTs = prevPropsWithTs.map(_._2.ts).max
val newTs = if (maxTs > requestTs) maxTs else requestTs
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 890dc4d..d1eda5e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -22,6 +22,7 @@
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{Executors, TimeUnit}
+import java.util.function.Consumer
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
@@ -239,8 +240,8 @@
tsVal
}
- def processDuplicates[T](queryParam: QueryParam,
- duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = {
+ def processDuplicates[R](queryParam: QueryParam,
+ duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = {
if (queryParam.label.consistencyLevel != "strong") {
//TODO:
@@ -309,7 +310,7 @@
edgeWithScore
}
- /** process step group by */
+ /* process step group by */
val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
@@ -319,25 +320,25 @@
val score = edgeWithScore.score
val label = edgeWithScore.label
- /** Select */
+ /* Select */
val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
- /** OrderBy */
+ /* OrderBy */
val orderByValues =
if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
- /** StepGroupBy */
+ /* StepGroupBy */
val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
- /** GroupBy */
+ /* GroupBy */
val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
- /** FilterOut */
+ /* FilterOut */
val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
newEdgeWithScore.copy(orderByValues = orderByValues,
@@ -346,13 +347,13 @@
filterOutValues = filterOutValues)
}
- /** process step group by */
+ /* process step group by */
val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
- /** process ordered list */
+ /* process ordered list */
val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
- /** process grouped list */
+ /* process grouped list */
val grouped =
if (queryOption.groupBy.keys.isEmpty) Nil
else {
@@ -365,13 +366,13 @@
val newScoreSum = scoreSum
- /**
+ /*
* watch out here. by calling toString on Any, we lose type information which will be used
* later for toJson.
*/
if (merged.nonEmpty) {
val newKey = merged.head.groupByValues
- agg += (newKey -> (newScoreSum, merged))
+ agg += ((newKey, (newScoreSum, merged)))
}
}
agg.toSeq.sortBy(_._2._1 * -1)
@@ -399,7 +400,7 @@
val score = edgeWithScore.score
val label = edgeWithScore.label
- /** Select */
+ /* Select */
val mergedPropsWithTs =
if (queryOption.selectColumns.isEmpty) {
edge.propertyValuesInner()
@@ -450,17 +451,17 @@
}
}
- private def buildResult[T](query: Query,
+ private def buildResult[R](query: Query,
stepIdx: Int,
stepResultLs: Seq[(QueryRequest, StepResult)],
parentEdges: Map[VertexId, Seq[EdgeWithScore]])
- (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T)
- (implicit ev: WithScore[T]): ListBuffer[T] = {
+ (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
+ (implicit ev: WithScore[R]): ListBuffer[R] = {
import scala.collection._
- val results = ListBuffer.empty[T]
- val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty
- val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty
+ val results = ListBuffer.empty[R]
+ val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
+ val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
@@ -488,7 +489,7 @@
val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
// params += (hashKey -> queryParam) //
- /** check if this edge should be exlcuded. */
+ /* check if this edge should be exlcuded. */
if (shouldBeExcluded) {
edgesToExclude.add(filterHashKey)
} else {
@@ -500,7 +501,7 @@
sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
duplicates.get(hashKey) match {
case None =>
- val newLs = ListBuffer.empty[(FilterHashKey, T)]
+ val newLs = ListBuffer.empty[(FilterHashKey, R)]
newLs += (filterHashKey -> newEdgeWithScore)
duplicates += (hashKey -> newLs) //
case Some(old) =>
@@ -547,7 +548,7 @@
new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD)
))
@Graph.OptOuts(value = Array(
- /** Process */
+ /* Process */
/* branch: passed all. */
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest$Traversals", method = "*", reason = "no"),
// passed: all
@@ -769,7 +770,7 @@
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest", method = "*", reason = "no"),
// passed: all
- /** Structure */
+ /* Structure */
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateIdEquality", reason="reference equals on EdgeId is not supported."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateEquality", reason="reference equals on EdgeId is not supported."),
// passed: all, failed: none
@@ -967,7 +968,7 @@
def flushStorage(): Unit = {
storagePool.foreach { case (_, storage) =>
- /** flush is blocking */
+ /* flush is blocking */
storage.flush()
}
}
@@ -1149,7 +1150,7 @@
def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
- /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+ /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
* so use empty cacheKey.
* */
val queryParam = QueryParam(labelName = edge.innerLabel.label,
@@ -1194,7 +1195,7 @@
val requestTs = ts
val vertices = srcVertices
- /** create query per label */
+ /* create query per label */
val queries = for {
label <- labels
} yield {
@@ -1252,7 +1253,7 @@
val ret = label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
if (label.consistencyLevel == "strong") {
- /**
+ /*
* read: snapshotEdge on queryResult = O(N)
* write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
*/
@@ -1262,7 +1263,7 @@
}
case _ =>
- /**
+ /*
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
@@ -1369,7 +1370,7 @@
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
- /** multiple edges with weak consistency level will be processed as batch */
+ /* multiple edges with weak consistency level will be processed as batch */
val mutations = edges.flatMap { edge =>
val (_, edgeUpdate) =
if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
@@ -1450,7 +1451,7 @@
val parts = GraphUtil.split(s)
val logType = parts(2)
val element = if (logType == "edge" | logType == "e") {
- /** current only edge is considered to be bulk loaded */
+ /* current only edge is considered to be bulk loaded */
labelMapping.get(parts(5)) match {
case None =>
case Some(toReplace) =>
@@ -1754,7 +1755,6 @@
case s2Edge: S2Edge => s2Edge.id().asInstanceOf[EdgeId]
case id: EdgeId => id
case s: String => EdgeId.fromString(s)
- case s: java.lang.String => EdgeId.fromString(s)
}
val edgesToFetch = for {
id <- s2EdgeIds
@@ -1937,8 +1937,7 @@
override def toString(): String = "[s2graph]"
override def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
- builder.graph(this).registry(new S2GraphIoRegistry).create().asInstanceOf[I]
-
+ builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I]
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index 7f13711..a9c98c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -119,7 +119,7 @@
true, S2Graph.DefaultServiceName, Nil, Seq(Prop("weight", "0.0", "double")), "strong", None, None)
}
- def cleanupDefaultSchema: Unit = {
+ def cleanupDefaultSchema(): Unit = {
val columnNames = Set(S2Graph.DefaultColumnName, "person", "software", "product", "dog",
"animal", "song", "artist", "STEPHEN")
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 6b0c0eb..e86c17f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -49,7 +49,6 @@
case _: Boolean => true
case _: Short => true
case _: Byte => true
- case _: String => true
case _: BigDecimal => true
case _ => false
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
index 43e5db8..e71bbce 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
@@ -51,7 +51,7 @@
def toRange(str: String): Option[(Int, Int)] = {
val range = str.split(rangeDelimiter)
- if (range.length == 2) Option(range.head.toInt, range.last.toInt)
+ if (range.length == 2) Option((range.head.toInt, range.last.toInt))
else None
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 415a64e..97fd704 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -189,14 +189,14 @@
val tgtServiceId = tgtService.id.get
val serviceId = service.id.get
- /** insert serviceColumn */
+ /* insert serviceColumn */
val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType))
val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType))
if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
- /** create label */
+ /* create label */
Label.findByName(labelName, useCache = false).getOrElse {
val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType,
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index a2f5c47..4bc9376 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -259,6 +259,7 @@
case arr: JsArray =>
val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
GroupBy(keys)
+ case _ => GroupBy.Empty
}.getOrElse(GroupBy.Empty)
def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[S2Vertex] = {
@@ -514,7 +515,7 @@
private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = {
(js \ key).validate[R] match {
case JsError(errors) =>
- val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
+ val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
val e = Json.obj("args" -> key, "error" -> msg)
throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
case JsSuccess(result, _) => result
@@ -524,7 +525,7 @@
private def parseOption[R](js: JsValue, key: String)(implicit read: Reads[R]): Option[R] = {
(js \ key).validateOpt[R] match {
case JsError(errors) =>
- val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
+ val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption)
val e = Json.obj("args" -> key, "error" -> msg)
throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString)
case JsSuccess(result, _) => result
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index bde7f3a..db9a9da 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -43,7 +43,7 @@
def toLogString = {
Map("table" -> Bytes.toString(table), "row" -> row.toList, "cf" -> Bytes.toString(cf),
"qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp,
- "operation" -> operation, "durability" -> durability).toString
+ "operation" -> operation, "durability" -> durability).toString()
}
override def toString(): String = toLogString
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index a8dec7e..57d4872 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -398,7 +398,7 @@
future recoverWith {
case FetchTimeoutException(retryEdge) =>
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
- /** fetch failed. re-fetch should be done */
+ /* fetch failed. re-fetch should be done */
fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -413,14 +413,14 @@
}
logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
- /** retry logic */
+ /* retry logic */
val promise = Promise[Boolean]
val backOff = exponentialBackOff(tryNum)
scheduledThreadPool.schedule(new Runnable {
override def run(): Unit = {
val future = if (failedStatusCode == 0) {
// acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
- /** fetch failed. re-fetch should be done */
+ /* fetch failed. re-fetch should be done */
fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -453,7 +453,7 @@
case 0 =>
fetchedSnapshotEdgeOpt match {
case None =>
- /**
+ /*
* no one has never mutated this SN.
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
@@ -475,7 +475,7 @@
case Some(snapshotEdge) =>
snapshotEdge.pendingEdgeOpt match {
case None =>
- /**
+ /*
* others finished commit on this SN. but there is no contention.
* (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
@@ -497,7 +497,7 @@
case Some(pendingEdge) =>
val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
if (isLockExpired) {
- /**
+ /*
* if pendingEdge.ts == snapshotEdge.ts =>
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
* else =>
@@ -519,7 +519,7 @@
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
} else {
- /**
+ /*
* others finished commit on this SN and there is currently contention.
* this can't be proceed so retry from re-fetch.
* throw EX
@@ -532,11 +532,11 @@
}
case _ =>
- /**
+ /*
* statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
*/
- /**
+ /*
* this succeed to lock this SN. keep doing on commit process.
* if SN.isEmpty =>
* no one never succed to commit on this SN.
@@ -807,7 +807,7 @@
buildIncrementsAsync(indexEdge, -1L)
}
- /** reverted direction */
+ /* reverted direction */
val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
@@ -972,7 +972,7 @@
tgtVertexIdOpt match {
case Some(tgtVertexId) => // _to is given.
- /** we use toSnapshotEdge so dont need to swap src, tgt */
+ /* we use toSnapshotEdge so dont need to swap src, tgt */
val src = srcVertex.innerId
val tgt = tgtVertexId
val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), TargetVertexId(tgtColumn, tgt))
@@ -989,7 +989,7 @@
}
protected def fetchSnapshotEdgeInner(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
- /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+ /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
* so use empty cacheKey.
* */
val queryParam = QueryParam(labelName = edge.innerLabel.label,
@@ -1042,21 +1042,21 @@
def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
(edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match {
case (true, true) =>
- /** when there is no need to update. shouldUpdate == false */
+ /* when there is no need to update. shouldUpdate == false */
Nil -> Nil
case (true, false) =>
- /** no edges to delete but there is new edges to insert so increase degree by 1 */
+ /* no edges to delete but there is new edges to insert so increase degree by 1 */
val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree)
buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_))
case (false, true) =>
- /** no edges to insert but there is old edges to delete so decrease degree by 1 */
+ /* no edges to insert but there is old edges to delete so decrease degree by 1 */
val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree)
buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1))
case (false, false) =>
- /** update on existing edges so no change on degree */
+ /* update on existing edges so no change on degree */
Nil -> Nil
}
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index e4d85cf..4fb2240 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -220,7 +220,7 @@
val _client = client(withWait)
val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
- /** Asynchbase IncrementRequest does not implement HasQualifiers */
+ /* Asynchbase IncrementRequest does not implement HasQualifiers */
val incrementsFutures = increments.map { kv =>
val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
val defer = _client.atomicIncrement(inc)
@@ -231,7 +231,7 @@
if (withWait) future else Future.successful(true)
}
- /** PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
+ /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
val othersFutures = putAndDeletes.groupBy { kv =>
(kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
}.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
@@ -362,7 +362,7 @@
}
(_startKey , Bytes.add(baseKey, intervalMinBytes))
} else {
- /**
+ /*
* note: since propsToBytes encode size of property map at first byte, we are sure about max value here
*/
val _startKey = queryParam.cursorOpt match {
@@ -449,7 +449,7 @@
val queryParam = queryRequest.queryParam
val cacheTTL = queryParam.cacheTTLInMillis
- /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+ /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
val edge = toRequestEdge(queryRequest, parentEdges)
val request = buildRequest(queryRequest, edge)
@@ -562,7 +562,7 @@
compressionAlgorithm: String,
replicationScopeOpt: Option[Int] = None,
totalRegionCount: Option[Int] = None): Unit = {
- /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
+ /* TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
for {
zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
} {
@@ -789,7 +789,7 @@
}
private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
- /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+ /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
hbaseRpc match {
case Left(getRequest) => getRequest.key
case Right(ScanWithRange(scanner, offset, limit)) =>
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 6095cea..3da8267 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -111,7 +111,7 @@
}
val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
- /** process indexProps */
+ /* process indexProps */
val size = idxPropsRaw.length
(0 until size).foreach { ith =>
val meta = index.sortKeyTypesArray(ith)
@@ -125,7 +125,7 @@
}
}
- /** process props */
+ /* process props */
if (op == GraphUtil.operations("incrementCount")) {
// val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
@@ -139,7 +139,7 @@
}
}
- /** process tgtVertexId */
+ /* process tgtVertexId */
val tgtVertexId =
if (edge.checkProperty(LabelMeta.to.name)) {
val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 6818c1d..59db07e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -87,7 +87,7 @@
val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
- /** process indexProps */
+ /* process indexProps */
val size = idxPropsRaw.length
(0 until size).foreach { ith =>
val meta = index.sortKeyTypesArray(ith)
@@ -101,7 +101,7 @@
}
}
- /** process props */
+ /* process props */
if (op == GraphUtil.operations("incrementCount")) {
// val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
@@ -114,7 +114,7 @@
edge.propertyInner(k.name, v.value, version)
}
}
- /** process tgtVertexId */
+ /* process tgtVertexId */
val tgtVertexId =
if (edge.checkProperty(LabelMeta.to.name)) {
val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index a47fda7..126f193 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -46,6 +46,7 @@
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
import scala.util.{Failure, Success, Try}
+import scala.language.existentials
class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] {
val ApplicationJson = "application/json"
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index e5fc75d..ff82c44 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -83,7 +83,7 @@
wallLogHandler.shutdown()
QueueActor.shutdown()
- /**
+ /*
* shutdown hbase client for flush buffers.
*/
shutdown()
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
index ecea304..8eb25fd 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -100,7 +100,7 @@
case Failure(error) =>
logger.error(error.getMessage, error)
error match {
- case JsResultException(e) => bad(JsError.toFlatJson(e))
+ case JsResultException(e) => bad(JsError.toJson(e))
case _ => bad(error.getMessage)
}
}
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index aed8ced..28da7fe 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -129,7 +129,7 @@
}.map(jsonResponse(_))
} else {
val rets = elementWithIdxs.map { case ((element, tsv), idx) =>
- if (!skipElement(element.isAsync)) QueueActor.router ! (element, tsv)
+ if (!skipElement(element.isAsync)) QueueActor.router ! ((element, tsv))
true
}
Future.successful(jsonResponse(Json.toJson(rets)))
@@ -251,7 +251,7 @@
def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
- /** logging for delete all request */
+ /* logging for delete all request */
def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = {
val kafkaMessages = for {
id <- ids