Merge branch 'master' of https://github.com/apache/incubator-nlpcraft
diff --git a/nlpcraft/src/main/resources/nlpcraft.conf b/nlpcraft/src/main/resources/nlpcraft.conf
index 54923c0..418b011 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -281,18 +281,6 @@
         # Maximum execution result size in bytes. Default value is 1M.
         # When exceeded the request will be automatically rejected.
         resultMaxSizeBytes = 1048576
-
-        #
-        # Timeout in ms for conversation manager garbage collector.
-        # Reduce if you are experiencing a large memory utilization under the load with many concurrent users.
-        #
-        convGcTimeoutMs = 60000
-
-        #
-        # Timeout in ms for dialog flow manager garbage collector.
-        # Reduce if you are experiencing a large memory utilization under the load with many concurrent users.
-        #
-        dialogGcTimeoutMs = 60000
     }
 
     # Basic NLP toolkit to use on both server and probes. Possible values:
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
index 21c17a7..365f4b4 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
@@ -18,6 +18,7 @@
 package org.apache.nlpcraft.probe.mgrs.conversation
 
 import java.util
+import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Predicate
 
 import com.typesafe.scalalogging.LazyLogging
@@ -36,9 +37,11 @@
 case class NCConversation(
     usrId: Long,
     mdlId: String,
-    updateTimeoutMs: Long,
+    timeoutMs: Long,
     maxDepth: Int
 ) extends LazyLogging with NCOpenCensusTrace {
+    private final val data = new ConcurrentHashMap[String, Object]()
+
     /**
      *
      * @param token
@@ -83,7 +86,7 @@
             depth += 1
 
             // Conversation cleared by timeout or when there are too much unsuccessful requests.
-            if (now - lastUpdateTstamp > updateTimeoutMs) {
+            if (now - lastUpdateTstamp > timeoutMs) {
                 stm.clear()
 
                 logger.info(s"Conversation is reset by timeout [" +
@@ -100,7 +103,7 @@
                 s"]")
             }
             else {
-                val minUsageTime = now - updateTimeoutMs
+                val minUsageTime = now - timeoutMs
                 val toks = lastToks.flatten
 
                 for (item ← stm) {
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
index cca7d5a..d31d4b3 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
@@ -17,15 +17,11 @@
 
 package org.apache.nlpcraft.probe.mgrs.conversation
 
-import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
-
 import io.opencensus.trace.Span
 import org.apache.nlpcraft.common._
-import org.apache.nlpcraft.common.config.NCConfigurable
 import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
 
 import scala.collection._
-import scala.collection.mutable.ArrayBuffer
 
 /**
   * Conversation manager.
@@ -34,20 +30,9 @@
     case class Key(userId: Long, mdlId: String)
     case class Value(conv: NCConversation, var tstamp: Long = 0)
 
-    private object Config extends NCConfigurable {
-        private final val name = "nlpcraft.probe.convGcTimeoutMs"
+    private final val convs: mutable.Map[Key, Value] = mutable.HashMap.empty[Key, Value]
 
-        def timeoutMs: Long = getInt(name)
-
-        def check(): Unit =
-            if (timeoutMs <= 0)
-                throw new NCE(s"Configuration property must be >= 0 [name=$name]")
-    }
-
-    Config.check()
-
-    @volatile private var convs: mutable.Map[Key, Value] = _
-    @volatile private var gc: ScheduledExecutorService = _
+    @volatile private var gc: Thread = _
 
     /**
      *
@@ -55,13 +40,23 @@
      * @return
      */
     override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
-        gc = Executors.newSingleThreadScheduledExecutor
+        gc =
+            U.mkThread("conversation-manager-gc") { t ⇒
+                while (!t.isInterrupted)
+                    try
+                        convs.synchronized {
+                            val sleepTime = clearForTimeout() - System.currentTimeMillis()
 
-        convs = mutable.HashMap.empty[Key, Value]
+                            if (sleepTime > 0)
+                                convs.wait(sleepTime)
+                        }
+                    catch {
+                        case _: InterruptedException ⇒ // No-op.
+                        case e: Throwable ⇒ logger.error(s"Unexpected error for: ${t.getName}", e)
+                    }
+            }
 
-        gc.scheduleWithFixedDelay(() ⇒ clearForTimeout(), Config.timeoutMs, Config.timeoutMs, TimeUnit.MILLISECONDS)
-
-        logger.info(s"Conversation manager GC started, checking every ${Config.timeoutMs}ms.")
+        gc.start()
 
         ackStart()
     }
@@ -71,7 +66,11 @@
      * @param parent Optional parent span.
      */
     override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
-        U.shutdownPools(gc)
+        U.stopThread(gc)
+
+        gc = null
+
+        convs.clear()
 
         logger.info("Conversation manager GC stopped.")
 
@@ -79,24 +78,35 @@
     }
 
     /**
-      *
+      * Gets next clearing time.
       */
-    private def clearForTimeout(): Unit =
-        startScopedSpan("clearForTimeout", "timeoutMs" → Config.timeoutMs) { _ ⇒
-            convs.synchronized {
-                val delKeys = ArrayBuffer.empty[Key]
+    private def clearForTimeout(): Long =
+        startScopedSpan("clearForTimeout") { _ ⇒
+            require(Thread.holdsLock(convs))
 
-                for ((key, value) ← convs)
+            val now = System.currentTimeMillis()
+            val delKeys = mutable.HashSet.empty[Key]
+
+            for ((key, value) ← convs) {
+                val del =
                     NCModelManager.getModelOpt(key.mdlId) match {
-                        case Some(data) ⇒
-                            if (value.tstamp < System.currentTimeMillis() - data.model.getConversationTimeout)
-                                delKeys += key
-
-                        case None ⇒ delKeys += key
+                        case Some(mdl) ⇒ value.tstamp < now - mdl.model.getConversationTimeout
+                        case None ⇒ true
                     }
 
-                convs --= delKeys
+                if (del) {
+                    value.conv.getData.clear()
+
+                    delKeys += key
+                }
             }
+
+            convs --= delKeys
+
+            if (convs.nonEmpty)
+                convs.values.map(v ⇒ v.tstamp + v.conv.timeoutMs).min
+            else
+                Long.MaxValue
         }
 
     /**
@@ -115,10 +125,12 @@
                     Key(usrId, mdlId),
                     Value(NCConversation(usrId, mdlId, mdl.getConversationTimeout, mdl.getConversationDepth))
                 )
-                
+
                 v.tstamp = U.nowUtcMs()
-                
-                v
-            }.conv
+
+                convs.notifyAll()
+
+                v.conv
+            }
         }
 }
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
index 7984199..1c4f9d4 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
@@ -17,15 +17,11 @@
 
 package org.apache.nlpcraft.probe.mgrs.dialogflow
 
-import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
-
 import io.opencensus.trace.Span
-import org.apache.nlpcraft.common.config.NCConfigurable
 import org.apache.nlpcraft.common.{NCService, _}
 import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection._
 
 /**
  * Dialog flow manager.
@@ -34,20 +30,9 @@
     case class Key(usrId: Long, mdlId: String)
     case class Value(intent: String, tstamp: Long)
 
-    private object Config extends NCConfigurable {
-        private final val name = "nlpcraft.probe.dialogGcTimeoutMs"
+    private final val flow = mutable.HashMap.empty[Key, mutable.ArrayBuffer[Value]]
 
-        def timeoutMs: Long = getInt(name)
-
-        def check(): Unit =
-            if (timeoutMs <= 0)
-                throw new NCE(s"Configuration property must be >= 0 [name=$name]")
-    }
-
-    Config.check()
-
-    @volatile private var flow: mutable.Map[Key, ArrayBuffer[Value]] = _
-    @volatile private var gc: ScheduledExecutorService = _
+    @volatile private var gc: Thread = _
 
     /**
      *
@@ -55,13 +40,23 @@
      * @return
      */
     override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
-        flow = mutable.HashMap.empty[Key, ArrayBuffer[Value]]
+        gc =
+            U.mkThread("dialog-flow-manager-gc") { t ⇒
+                while (!t.isInterrupted)
+                    try
+                        flow.synchronized {
+                            val sleepTime = clearForTimeout() - System.currentTimeMillis()
 
-        gc = Executors.newSingleThreadScheduledExecutor
+                            if (sleepTime > 0)
+                                flow.wait(sleepTime)
+                        }
+                    catch {
+                        case _: InterruptedException ⇒ // No-op.
+                        case e: Throwable ⇒ logger.error(s"Unexpected error for: ${t.getName}", e)
+                    }
+            }
 
-        gc.scheduleWithFixedDelay(() ⇒ clearForTimeout(), Config.timeoutMs, Config.timeoutMs, TimeUnit.MILLISECONDS)
-
-        logger.info(s"Dialog flow manager GC started, checking every ${Config.timeoutMs}ms.")
+        gc.start()
 
         ackStart()
     }
@@ -71,7 +66,11 @@
      * @param parent Optional parent span.
      */
     override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
-        U.shutdownPools(gc)
+        U.stopThread(gc)
+
+        gc = null
+
+        flow.clear()
 
         logger.info("Dialog flow manager GC stopped.")
 
@@ -88,9 +87,11 @@
     def addMatchedIntent(intId: String, usrId: Long, mdlId: String, parent: Span = null): Unit = {
         startScopedSpan("addMatchedIntent", parent, "usrId" → usrId, "mdlId" → mdlId, "intId" → intId) { _ ⇒
             flow.synchronized {
-                flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).append(
+                flow.getOrElseUpdate(Key(usrId, mdlId), mutable.ArrayBuffer.empty[Value]).append(
                     Value(intId, System.currentTimeMillis())
                 )
+
+                flow.notifyAll()
             }
 
             logger.trace(s"Added to dialog flow [mdlId=$mdlId, intId=$intId, userId=$usrId]")
@@ -107,30 +108,46 @@
     def getDialogFlow(usrId: Long, mdlId: String, parent: Span = null): Seq[String] =
         startScopedSpan("getDialogFlow", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
             flow.synchronized {
-                flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).map(_.intent)
+                flow.getOrElseUpdate(Key(usrId, mdlId), mutable.ArrayBuffer.empty[Value]).map(_.intent)
             }
         }
 
     /**
-     *
+     *  Gets next clearing time.
      */
-    private def clearForTimeout(): Unit =
-        startScopedSpan("clearForTimeout", "timeoutMs" → Config.timeoutMs) { _ ⇒
-            flow.synchronized {
-                val delKeys = ArrayBuffer.empty[Key]
+    private def clearForTimeout(): Long =
+        startScopedSpan("clearForTimeout") { _ ⇒
+            require(Thread.holdsLock(flow))
 
-                for ((key, values) ← flow)
-                    NCModelManager.getModelOpt(key.mdlId) match {
-                        case Some(data) ⇒
-                            val ms = System.currentTimeMillis() - data.model.getConversationTimeout
+            val now = System.currentTimeMillis()
+            val delKeys = mutable.HashSet.empty[Key]
+            val timeouts = mutable.HashMap.empty[String, Long]
 
-                            values --= values.filter(_.tstamp < ms)
+            for ((key, values) ← flow)
+                NCModelManager.getModelOpt(key.mdlId) match {
+                    case Some(mdl) ⇒
+                        val ms = now - mdl.model.getConversationTimeout
 
-                        case None ⇒ delKeys += key
-                    }
+                        values --= values.filter(_.tstamp < ms)
 
-                flow --= delKeys
-            }
+                        timeouts += mdl.model.getId → mdl.model.getConversationTimeout
+
+                        // https://github.com/scala/bug/issues/10151
+                        // Scala bug workaround.
+                        ()
+                    case None ⇒ delKeys += key
+                }
+
+            delKeys ++= flow.filter(_._2.isEmpty).keySet
+
+            flow --= delKeys
+
+            val times = (for ((key, values) ← flow) yield values.map(v ⇒ v.tstamp + timeouts(key.mdlId))).flatten
+
+            if (times.nonEmpty)
+                times.min
+            else
+                Long.MaxValue
         }
     
     /**
@@ -144,6 +161,8 @@
         startScopedSpan("clear", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
             flow.synchronized {
                 flow -= Key(usrId, mdlId)
+
+                flow.notifyAll()
             }
         }
     
@@ -161,6 +180,8 @@
 
             flow.synchronized {
                 flow(key) = flow(key).filterNot(v ⇒ pred(v.intent))
+
+                flow.notifyAll()
             }
         }
 }
diff --git a/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCConversationTimeoutSpec.scala b/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCConversationTimeoutSpec.scala
new file mode 100644
index 0000000..c94ba26
--- /dev/null
+++ b/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCConversationTimeoutSpec.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nlpcraft.model.conversation
+
+import java.io.IOException
+import java.util
+import java.util.Collections
+
+import org.apache.nlpcraft.common.NCException
+import org.apache.nlpcraft.model.{NCElement, NCIntent, NCIntentMatch, NCModel, NCResult}
+import org.apache.nlpcraft.{NCTestContext, NCTestEnvironment}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+object NCTimeoutSpecModel {
+    final val TIMEOUT = 2000
+}
+
+import org.apache.nlpcraft.model.conversation.NCTimeoutSpecModel._
+
+class NCTimeoutSpecModel extends NCModel {
+    private var step = 0
+    private var saveData: util.Map[String, AnyRef] = _
+
+    override def getId: String = this.getClass.getSimpleName
+
+    override def getName: String = this.getClass.getSimpleName
+
+    override def getVersion: String = "1.0.0"
+
+    override def getConversationTimeout: Long = TIMEOUT
+
+    override def getElements: util.Set[NCElement] =
+        Collections.singleton(
+            new NCElement {
+                override def getId: String = "test"
+
+                override def getSynonyms: util.List[String] = Collections.singletonList("test")
+            }
+        )
+
+    @NCIntent("intent=req conv=true term={id == 'test'}")
+    def onMatch(ctx: NCIntentMatch): NCResult = {
+        val conv = ctx.getContext.getConversation
+
+        step match {
+            case 0 ⇒
+                assertTrue(conv.getData.isEmpty)
+                assertTrue(conv.getDialogFlow.isEmpty)
+
+                conv.getData.put("key", "value")
+
+                saveData = conv.getData
+
+            case 1 ⇒
+                assertFalse(conv.getData.isEmpty)
+                assertEquals(saveData, conv.getData)
+                assertEquals("value", conv.getData.getOrDefault("key", "-"))
+                assertFalse(conv.getDialogFlow.isEmpty)
+
+            case 2 ⇒
+                assertTrue(conv.getData.isEmpty)
+                assertTrue(saveData.isEmpty)
+                assertTrue(conv.getDialogFlow.isEmpty)
+
+            case 3 ⇒
+                assertTrue(conv.getData.isEmpty)
+                assertFalse(conv.getDialogFlow.isEmpty)
+
+            case _ ⇒ require(false)
+        }
+
+        val msg = s"Step-$step"
+
+        step = step + 1
+
+        NCResult.json(msg)
+    }
+}
+
+/**
+  * @see NCTimeoutSpecModel
+  */
+@NCTestEnvironment(model = classOf[NCTimeoutSpecModel], startClient = true)
+class NCConversationTimeoutSpec extends NCTestContext {
+    @Test
+    @throws[NCException]
+    @throws[IOException]
+    private[conversation] def test(): Unit = {
+        def ask(): Unit = assertTrue(getClient.ask("test").isOk)
+
+        ask()
+        ask()
+
+        Thread.sleep(TIMEOUT + 1000)
+
+        ask()
+        ask()
+    }
+}