package org.apache.nlpcraft.probe.mgrs.dialogflow
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
import io.opencensus.trace.Span
import org.apache.nlpcraft.common.config.NCConfigurable
import org.apache.nlpcraft.common.{NCService, _}
import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
* Dialog flow manager.
object NCDialogFlowManager extends NCService {
case class Key(usrId: Long, mdlId: String)
case class Value(intent: String, tstamp: Long)
private object Config extends NCConfigurable {
private final val name = "nlpcraft.probe.dialogGcTimeoutMs"
def timeoutMs: Long = getInt(name)
def check(): Unit =
if (timeoutMs <= 0)
throw new NCE(s"Configuration property must be >= 0 [name=$name]")
@volatile private var flow: mutable.Map[Key, ArrayBuffer[Value]] = _
@volatile private var gc: ScheduledExecutorService = _
override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
flow = mutable.HashMap.empty[Key, ArrayBuffer[Value]]
gc = Executors.newSingleThreadScheduledExecutor
gc.scheduleWithFixedDelay(() ⇒ clearForTimeout(), Config.timeoutMs, Config.timeoutMs, TimeUnit.MILLISECONDS)"Dialog flow manager GC started, checking every ${Config.timeoutMs}ms.")
override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
U.shutdownPools(gc)"Dialog flow manager GC stopped.")
* Adds matched (winning) intent to the dialog flow for the given user and model IDs.
* @param intId Intent ID.
* @param usrId User ID.
* @param mdlId Model ID.
def addMatchedIntent(intId: String, usrId: Long, mdlId: String, parent: Span = null): Unit = {
startScopedSpan("addMatchedIntent", parent, "usrId" → usrId, "mdlId" → mdlId, "intId" → intId) { _ ⇒
flow.synchronized {
flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).append(
Value(intId, System.currentTimeMillis())
logger.trace(s"Added to dialog flow [mdlId=$mdlId, intId=$intId, userId=$usrId]")
* Gets sequence of intent ID sorted from oldest to newest (i.e. dialog flow) for given user and model IDs.
* @param usrId User ID.
* @param mdlId Model ID.
* @return Dialog flow.
def getDialogFlow(usrId: Long, mdlId: String, parent: Span = null): Seq[String] =
startScopedSpan("getDialogFlow", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
flow.synchronized {
flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).map(_.intent)
private def clearForTimeout(): Unit =
startScopedSpan("clearForTimeout", "timeoutMs"Config.timeoutMs) { _ ⇒
flow.synchronized {
val delKeys = ArrayBuffer.empty[Key]
for ((key, values) ← flow)
NCModelManager.getModelDataOpt(key.mdlId) match {
case Some(data)
val ms = System.currentTimeMillis() - data.model.getDialogTimeout
values --= values.filter(_.tstamp < ms)
case None ⇒ delKeys += key
flow --= delKeys
* Clears dialog for given user and model IDs.
* @param usrId User ID.
* @param mdlId Model ID.
* @param parent Parent span, if any.
def clear(usrId: Long, mdlId: String, parent: Span = null): Unit =
startScopedSpan("clear", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
flow.synchronized {
flow -= Key(usrId, mdlId)
* Clears dialog for given use, model IDs and predicate.
* @param usrId User ID.
* @param mdlId Model ID.
* @param pred Intent ID predicate.
* @param parent Parent span, if any.
def clearForPredicate(usrId: Long, mdlId: String, pred: StringBoolean, parent: Span = null): Unit =
startScopedSpan("clearForPredicate", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
val key = Key(usrId, mdlId)
flow.synchronized {
flow(key) = flow(key).filterNot(v ⇒ pred(v.intent))