Merge branch 'master' into S2GRAPH-232
* master:
[S2GRAPH-230] ResourceManager onEvict cause segmentation fault with AnnoyModelFetcher
[S2GRAPH-231] Change the GraphQL type name to a valid string.
reTest ci
add background task on ResourceManager onEvict.
diff --git a/CHANGES b/CHANGES
index da32f18..734f498 100644
--- a/CHANGES
+++ b/CHANGES
@@ -83,6 +83,8 @@
* [S2GRAPH-214] - Add REAME for movielens examples
* [S2GRAPH-216] - Provide a transform directive in the GraphQL query result.
* [S2GRAPH-221] - Unify configurations for bulk and mutate in S2GraphSink.
+ * [S2GRAPH-230] - ResourceManager onEvict cause segmentation fault with AnnoyModelFetcher
+ * [S2GRAPH-231] - Change the GraphQL type name to a valid string.
** New Feature
* [S2GRAPH-123] - Support different index on out/in direction.
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
index 051ca9f..14ff767 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -19,6 +19,8 @@
package org.apache.s2graph.core
+import java.util.concurrent.{Executors, TimeUnit}
+
import com.typesafe.config.impl.ConfigImpl
import com.typesafe.config._
import org.apache.s2graph.core.schema.{Label, ServiceColumn}
@@ -47,7 +49,7 @@
val EdgeMutatorKey = classOf[EdgeMutator].getName
val VertexMutatorKey = classOf[VertexMutator].getName
- val DefaultMaxSize = 1000
+ val DefaultMaxSize = 10
val DefaultCacheTTL = -1
val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> DefaultMaxSize, TtlKey -> DefaultCacheTTL).asJava)
}
@@ -59,6 +61,13 @@
import scala.collection.JavaConverters._
+ def shutdown(): Unit = {
+ cache.asMap().asScala.foreach { case (_, (obj, _, _)) =>
+ onEvict(obj)
+ }
+ }
+ val scheduler = Executors.newScheduledThreadPool(1)
+ val waitForEvictionInSeconds = 10
val maxSize = Try(_config.getInt(ResourceManager.MaxSizeKey)).getOrElse(DefaultMaxSize)
val cacheTTL = Try(_config.getInt(ResourceManager.CacheTTL)).getOrElse(DefaultCacheTTL)
@@ -72,21 +81,28 @@
cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj }
}
+
def onEvict(oldValue: AnyRef): Unit = {
oldValue match {
case o: Option[_] => o.foreach { case v: AutoCloseable =>
- v.close()
- logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
+ scheduler.schedule(newCloseTask(v), waitForEvictionInSeconds, TimeUnit.SECONDS)
}
case v: AutoCloseable =>
- v.close()
- logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
-
+ scheduler.schedule(newCloseTask(v), waitForEvictionInSeconds, TimeUnit.SECONDS)
case _ => logger.info(s"Class does't have close() method ${oldValue.getClass.getName}")
}
}
+ private def newCloseTask(v: AutoCloseable) = {
+ new Runnable {
+ override def run(): Unit = {
+ v.close()
+ logger.info(s"[${v.getClass.getName}]: $v evicted.")
+ }
+ }
+ }
+
def getOrElseUpdateVertexFetcher(column: ServiceColumn,
cacheTTLInSecs: Option[Int] = None): Option[VertexFetcher] = {
val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 7d6e20b..9657e10 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -295,6 +295,7 @@
if (running.compareAndSet(true, false)) {
flushStorage()
Schema.shutdown(modelDataDelete)
+ resourceManager.shutdown()
defaultStorage.shutdown()
localLongId.set(0l)
}
@@ -353,9 +354,10 @@
mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
}
- indexProvider.mutateVerticesAsync(vertices)
- Future.sequence(futures).map{ ls =>
- ls.flatten.toSeq.sortBy(_._2).map(_._1)
+ Future.sequence(futures).flatMap { ls =>
+ indexProvider.mutateVerticesAsync(vertices).map { _ =>
+ ls.flatten.toSeq.sortBy(_._2).map(_._1)
+ }
}
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 755b8d0..bf1f22c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -161,8 +161,8 @@
put(key, cachedVal, false)
logger.error(s"withCache update failed: $cacheKey", ex)
case Success(newValue) =>
- put(key, newValue, broadcast = (broadcast && newValue != cachedVal))
+ put(key, newValue, broadcast = (broadcast && newValue != cachedVal))
onEvict(cachedVal)
cachedVal match {
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
index dd1ee87..deb7664 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
@@ -17,6 +17,7 @@
* under the License.
*/
+
package org.apache.s2graph.graphql.types
import org.apache.s2graph.core.schema._