[S2GRAPH-65]: Deferred produce exception.
fix type cast bug.
JIRA:
[S2GRAPH-65] https://issues.apache.org/jira/browse/S2GRAPH-65
Pull Request:
Closes #59
Authors:
DOYUNG YOON: steamshon@apache.org
diff --git a/CHANGES b/CHANGES
index c8cf738..436cca4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -94,6 +94,8 @@
S2GRAPH-19: When query with duration error (Committed by DOYUNG YOON).
S2GRAPH-63: Condition on partition strong edges and weak edges on mutateEdges is wrong (Committed by DOYUNG YOON).
+
+ S2GRAPH-65: Deferred produce exception (Committed by DOYUNG YOON).
S2GRAPH-64: incrementCounts yield type case exception (Committed by DOYUNG YOON).
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index b5d94e6..c0c369b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -99,7 +99,7 @@
private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
/** Future Cache to squash request */
- private val futureCache = new DeferCache[QueryResult](config)(ec)
+ private val futureCache = new DeferCache[QueryRequestWithResult](config)(ec)
/** Simple Vertex Cache */
private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec)
@@ -277,19 +277,19 @@
isInnerCall: Boolean,
parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = {
- def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
+ def fetchInner(hbaseRpc: AnyRef): Deferred[QueryRequestWithResult] = {
fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
} else edgeWithScores
- QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
-// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+// QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
+ QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
} recoverWith { ex =>
logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
- QueryResult(isFailure = true)
-// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+// QueryResult(isFailure = true)
+ QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
}
}
@@ -297,14 +297,13 @@
val cacheTTL = queryParam.cacheTTLInMillis
val request = buildRequest(queryRequest)
- val defer =
- if (cacheTTL <= 0) fetchInner(request)
- else {
- val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
- val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
- futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+
+ if (cacheTTL <= 0) fetchInner(request)
+ else {
+ val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+ val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+ futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
}
- defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)}
}