Merge branch 'master' of https://github.com/apache/incubator-nlpcraft
diff --git a/nlpcraft/src/main/resources/nlpcraft.conf b/nlpcraft/src/main/resources/nlpcraft.conf
index 54923c0..418b011 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -281,18 +281,6 @@
# Maximum execution result size in bytes. Default value is 1M.
# When exceeded the request will be automatically rejected.
resultMaxSizeBytes = 1048576
-
- #
- # Timeout in ms for conversation manager garbage collector.
- # Reduce if you are experiencing a large memory utilization under the load with many concurrent users.
- #
- convGcTimeoutMs = 60000
-
- #
- # Timeout in ms for dialog flow manager garbage collector.
- # Reduce if you are experiencing a large memory utilization under the load with many concurrent users.
- #
- dialogGcTimeoutMs = 60000
}
# Basic NLP toolkit to use on both server and probes. Possible values:
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
index 21c17a7..365f4b4 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
@@ -18,6 +18,7 @@
package org.apache.nlpcraft.probe.mgrs.conversation
import java.util
+import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
import com.typesafe.scalalogging.LazyLogging
@@ -36,9 +37,11 @@
case class NCConversation(
usrId: Long,
mdlId: String,
- updateTimeoutMs: Long,
+ timeoutMs: Long,
maxDepth: Int
) extends LazyLogging with NCOpenCensusTrace {
+ private final val data = new ConcurrentHashMap[String, Object]()
+
/**
*
* @param token
@@ -83,7 +86,7 @@
depth += 1
// Conversation cleared by timeout or when there are too much unsuccessful requests.
- if (now - lastUpdateTstamp > updateTimeoutMs) {
+ if (now - lastUpdateTstamp > timeoutMs) {
stm.clear()
logger.info(s"Conversation is reset by timeout [" +
@@ -100,7 +103,7 @@
s"]")
}
else {
- val minUsageTime = now - updateTimeoutMs
+ val minUsageTime = now - timeoutMs
val toks = lastToks.flatten
for (item ← stm) {
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
index cca7d5a..d31d4b3 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
@@ -17,15 +17,11 @@
package org.apache.nlpcraft.probe.mgrs.conversation
-import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
-
import io.opencensus.trace.Span
import org.apache.nlpcraft.common._
-import org.apache.nlpcraft.common.config.NCConfigurable
import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
import scala.collection._
-import scala.collection.mutable.ArrayBuffer
/**
* Conversation manager.
@@ -34,20 +30,9 @@
case class Key(userId: Long, mdlId: String)
case class Value(conv: NCConversation, var tstamp: Long = 0)
- private object Config extends NCConfigurable {
- private final val name = "nlpcraft.probe.convGcTimeoutMs"
+ private final val convs: mutable.Map[Key, Value] = mutable.HashMap.empty[Key, Value]
- def timeoutMs: Long = getInt(name)
-
- def check(): Unit =
- if (timeoutMs <= 0)
- throw new NCE(s"Configuration property must be >= 0 [name=$name]")
- }
-
- Config.check()
-
- @volatile private var convs: mutable.Map[Key, Value] = _
- @volatile private var gc: ScheduledExecutorService = _
+ @volatile private var gc: Thread = _
/**
*
@@ -55,13 +40,23 @@
* @return
*/
override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
- gc = Executors.newSingleThreadScheduledExecutor
+ gc =
+ U.mkThread("conversation-manager-gc") { t ⇒
+ while (!t.isInterrupted)
+ try
+ convs.synchronized {
+ val sleepTime = clearForTimeout() - System.currentTimeMillis()
- convs = mutable.HashMap.empty[Key, Value]
+ if (sleepTime > 0)
+ convs.wait(sleepTime)
+ }
+ catch {
+ case _: InterruptedException ⇒ // No-op.
+ case e: Throwable ⇒ logger.error(s"Unexpected error for: ${t.getName}", e)
+ }
+ }
- gc.scheduleWithFixedDelay(() ⇒ clearForTimeout(), Config.timeoutMs, Config.timeoutMs, TimeUnit.MILLISECONDS)
-
- logger.info(s"Conversation manager GC started, checking every ${Config.timeoutMs}ms.")
+ gc.start()
ackStart()
}
@@ -71,7 +66,11 @@
* @param parent Optional parent span.
*/
override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
- U.shutdownPools(gc)
+ U.stopThread(gc)
+
+ gc = null
+
+ convs.clear()
logger.info("Conversation manager GC stopped.")
@@ -79,24 +78,35 @@
}
/**
- *
+ * Gets next clearing time.
*/
- private def clearForTimeout(): Unit =
- startScopedSpan("clearForTimeout", "timeoutMs" → Config.timeoutMs) { _ ⇒
- convs.synchronized {
- val delKeys = ArrayBuffer.empty[Key]
+ private def clearForTimeout(): Long =
+ startScopedSpan("clearForTimeout") { _ ⇒
+ require(Thread.holdsLock(convs))
- for ((key, value) ← convs)
+ val now = System.currentTimeMillis()
+ val delKeys = mutable.HashSet.empty[Key]
+
+ for ((key, value) ← convs) {
+ val del =
NCModelManager.getModelOpt(key.mdlId) match {
- case Some(data) ⇒
- if (value.tstamp < System.currentTimeMillis() - data.model.getConversationTimeout)
- delKeys += key
-
- case None ⇒ delKeys += key
+ case Some(mdl) ⇒ value.tstamp < now - mdl.model.getConversationTimeout
+ case None ⇒ true
}
- convs --= delKeys
+ if (del) {
+ value.conv.getData.clear()
+
+ delKeys += key
+ }
}
+
+ convs --= delKeys
+
+ if (convs.nonEmpty)
+ convs.values.map(v ⇒ v.tstamp + v.conv.timeoutMs).min
+ else
+ Long.MaxValue
}
/**
@@ -115,10 +125,12 @@
Key(usrId, mdlId),
Value(NCConversation(usrId, mdlId, mdl.getConversationTimeout, mdl.getConversationDepth))
)
-
+
v.tstamp = U.nowUtcMs()
-
- v
- }.conv
+
+ convs.notifyAll()
+
+ v.conv
+ }
}
}
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
index 7984199..1c4f9d4 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
@@ -17,15 +17,11 @@
package org.apache.nlpcraft.probe.mgrs.dialogflow
-import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
-
import io.opencensus.trace.Span
-import org.apache.nlpcraft.common.config.NCConfigurable
import org.apache.nlpcraft.common.{NCService, _}
import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection._
/**
* Dialog flow manager.
@@ -34,20 +30,9 @@
case class Key(usrId: Long, mdlId: String)
case class Value(intent: String, tstamp: Long)
- private object Config extends NCConfigurable {
- private final val name = "nlpcraft.probe.dialogGcTimeoutMs"
+ private final val flow = mutable.HashMap.empty[Key, mutable.ArrayBuffer[Value]]
- def timeoutMs: Long = getInt(name)
-
- def check(): Unit =
- if (timeoutMs <= 0)
- throw new NCE(s"Configuration property must be >= 0 [name=$name]")
- }
-
- Config.check()
-
- @volatile private var flow: mutable.Map[Key, ArrayBuffer[Value]] = _
- @volatile private var gc: ScheduledExecutorService = _
+ @volatile private var gc: Thread = _
/**
*
@@ -55,13 +40,23 @@
* @return
*/
override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
- flow = mutable.HashMap.empty[Key, ArrayBuffer[Value]]
+ gc =
+ U.mkThread("dialog-flow-manager-gc") { t ⇒
+ while (!t.isInterrupted)
+ try
+ flow.synchronized {
+ val sleepTime = clearForTimeout() - System.currentTimeMillis()
- gc = Executors.newSingleThreadScheduledExecutor
+ if (sleepTime > 0)
+ flow.wait(sleepTime)
+ }
+ catch {
+ case _: InterruptedException ⇒ // No-op.
+ case e: Throwable ⇒ logger.error(s"Unexpected error for: ${t.getName}", e)
+ }
+ }
- gc.scheduleWithFixedDelay(() ⇒ clearForTimeout(), Config.timeoutMs, Config.timeoutMs, TimeUnit.MILLISECONDS)
-
- logger.info(s"Dialog flow manager GC started, checking every ${Config.timeoutMs}ms.")
+ gc.start()
ackStart()
}
@@ -71,7 +66,11 @@
* @param parent Optional parent span.
*/
override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
- U.shutdownPools(gc)
+ U.stopThread(gc)
+
+ gc = null
+
+ flow.clear()
logger.info("Dialog flow manager GC stopped.")
@@ -88,9 +87,11 @@
def addMatchedIntent(intId: String, usrId: Long, mdlId: String, parent: Span = null): Unit = {
startScopedSpan("addMatchedIntent", parent, "usrId" → usrId, "mdlId" → mdlId, "intId" → intId) { _ ⇒
flow.synchronized {
- flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).append(
+ flow.getOrElseUpdate(Key(usrId, mdlId), mutable.ArrayBuffer.empty[Value]).append(
Value(intId, System.currentTimeMillis())
)
+
+ flow.notifyAll()
}
logger.trace(s"Added to dialog flow [mdlId=$mdlId, intId=$intId, userId=$usrId]")
@@ -107,30 +108,46 @@
def getDialogFlow(usrId: Long, mdlId: String, parent: Span = null): Seq[String] =
startScopedSpan("getDialogFlow", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
flow.synchronized {
- flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).map(_.intent)
+ flow.getOrElseUpdate(Key(usrId, mdlId), mutable.ArrayBuffer.empty[Value]).map(_.intent)
}
}
/**
- *
+ * Gets next clearing time.
*/
- private def clearForTimeout(): Unit =
- startScopedSpan("clearForTimeout", "timeoutMs" → Config.timeoutMs) { _ ⇒
- flow.synchronized {
- val delKeys = ArrayBuffer.empty[Key]
+ private def clearForTimeout(): Long =
+ startScopedSpan("clearForTimeout") { _ ⇒
+ require(Thread.holdsLock(flow))
- for ((key, values) ← flow)
- NCModelManager.getModelOpt(key.mdlId) match {
- case Some(data) ⇒
- val ms = System.currentTimeMillis() - data.model.getConversationTimeout
+ val now = System.currentTimeMillis()
+ val delKeys = mutable.HashSet.empty[Key]
+ val timeouts = mutable.HashMap.empty[String, Long]
- values --= values.filter(_.tstamp < ms)
+ for ((key, values) ← flow)
+ NCModelManager.getModelOpt(key.mdlId) match {
+ case Some(mdl) ⇒
+ val ms = now - mdl.model.getConversationTimeout
- case None ⇒ delKeys += key
- }
+ values --= values.filter(_.tstamp < ms)
- flow --= delKeys
- }
+ timeouts += mdl.model.getId → mdl.model.getConversationTimeout
+
+ // https://github.com/scala/bug/issues/10151
+ // Scala bug workaround.
+ ()
+ case None ⇒ delKeys += key
+ }
+
+ delKeys ++= flow.filter(_._2.isEmpty).keySet
+
+ flow --= delKeys
+
+ val times = (for ((key, values) ← flow) yield values.map(v ⇒ v.tstamp + timeouts(key.mdlId))).flatten
+
+ if (times.nonEmpty)
+ times.min
+ else
+ Long.MaxValue
}
/**
@@ -144,6 +161,8 @@
startScopedSpan("clear", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
flow.synchronized {
flow -= Key(usrId, mdlId)
+
+ flow.notifyAll()
}
}
@@ -161,6 +180,8 @@
flow.synchronized {
flow(key) = flow(key).filterNot(v ⇒ pred(v.intent))
+
+ flow.notifyAll()
}
}
}
diff --git a/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCConversationTimeoutSpec.scala b/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCConversationTimeoutSpec.scala
new file mode 100644
index 0000000..c94ba26
--- /dev/null
+++ b/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCConversationTimeoutSpec.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nlpcraft.model.conversation
+
+import java.io.IOException
+import java.util
+import java.util.Collections
+
+import org.apache.nlpcraft.common.NCException
+import org.apache.nlpcraft.model.{NCElement, NCIntent, NCIntentMatch, NCModel, NCResult}
+import org.apache.nlpcraft.{NCTestContext, NCTestEnvironment}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+object NCTimeoutSpecModel {
+ final val TIMEOUT = 2000
+}
+
+import org.apache.nlpcraft.model.conversation.NCTimeoutSpecModel._
+
+class NCTimeoutSpecModel extends NCModel {
+ private var step = 0
+ private var saveData: util.Map[String, AnyRef] = _
+
+ override def getId: String = this.getClass.getSimpleName
+
+ override def getName: String = this.getClass.getSimpleName
+
+ override def getVersion: String = "1.0.0"
+
+ override def getConversationTimeout: Long = TIMEOUT
+
+ override def getElements: util.Set[NCElement] =
+ Collections.singleton(
+ new NCElement {
+ override def getId: String = "test"
+
+ override def getSynonyms: util.List[String] = Collections.singletonList("test")
+ }
+ )
+
+ @NCIntent("intent=req conv=true term={id == 'test'}")
+ def onMatch(ctx: NCIntentMatch): NCResult = {
+ val conv = ctx.getContext.getConversation
+
+ step match {
+ case 0 ⇒
+ assertTrue(conv.getData.isEmpty)
+ assertTrue(conv.getDialogFlow.isEmpty)
+
+ conv.getData.put("key", "value")
+
+ saveData = conv.getData
+
+ case 1 ⇒
+ assertFalse(conv.getData.isEmpty)
+ assertEquals(saveData, conv.getData)
+ assertEquals("value", conv.getData.getOrDefault("key", "-"))
+ assertFalse(conv.getDialogFlow.isEmpty)
+
+ case 2 ⇒
+ assertTrue(conv.getData.isEmpty)
+ assertTrue(saveData.isEmpty)
+ assertTrue(conv.getDialogFlow.isEmpty)
+
+ case 3 ⇒
+ assertTrue(conv.getData.isEmpty)
+ assertFalse(conv.getDialogFlow.isEmpty)
+
+ case _ ⇒ require(false)
+ }
+
+ val msg = s"Step-$step"
+
+ step = step + 1
+
+ NCResult.json(msg)
+ }
+}
+
+/**
+ * @see NCTimeoutSpecModel
+ */
+@NCTestEnvironment(model = classOf[NCTimeoutSpecModel], startClient = true)
+class NCConversationTimeoutSpec extends NCTestContext {
+ @Test
+ @throws[NCException]
+ @throws[IOException]
+ private[conversation] def test(): Unit = {
+ def ask(): Unit = assertTrue(getClient.ask("test").isOk)
+
+ ask()
+ ask()
+
+ Thread.sleep(TIMEOUT + 1000)
+
+ ask()
+ ask()
+ }
+}