blob: d31d4b3371c3fa9eb428f4e61d3112bcb89f1304 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nlpcraft.probe.mgrs.conversation
import io.opencensus.trace.Span
import org.apache.nlpcraft.common._
import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
import scala.collection._
/**
* Conversation manager.
*/
object NCConversationManager extends NCService {
case class Key(userId: Long, mdlId: String)
case class Value(conv: NCConversation, var tstamp: Long = 0)
private final val convs: mutable.Map[Key, Value] = mutable.HashMap.empty[Key, Value]
@volatile private var gc: Thread = _
/**
*
* @param parent Optional parent span.
* @return
*/
override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
gc =
U.mkThread("conversation-manager-gc") { t ⇒
while (!t.isInterrupted)
try
convs.synchronized {
val sleepTime = clearForTimeout() - System.currentTimeMillis()
if (sleepTime > 0)
convs.wait(sleepTime)
}
catch {
case _: InterruptedException// No-op.
case e: Throwable ⇒ logger.error(s"Unexpected error for: ${t.getName}", e)
}
}
gc.start()
ackStart()
}
/**
*
* @param parent Optional parent span.
*/
override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
U.stopThread(gc)
gc = null
convs.clear()
logger.info("Conversation manager GC stopped.")
ackStop()
}
/**
* Gets next clearing time.
*/
private def clearForTimeout(): Long =
startScopedSpan("clearForTimeout") { _ ⇒
require(Thread.holdsLock(convs))
val now = System.currentTimeMillis()
val delKeys = mutable.HashSet.empty[Key]
for ((key, value) ← convs) {
val del =
NCModelManager.getModelOpt(key.mdlId) match {
case Some(mdl) ⇒ value.tstamp < now - mdl.model.getConversationTimeout
case Nonetrue
}
if (del) {
value.conv.getData.clear()
delKeys += key
}
}
convs --= delKeys
if (convs.nonEmpty)
convs.values.map(v ⇒ v.tstamp + v.conv.timeoutMs).min
else
Long.MaxValue
}
/**
* Gets conversation for given key.
*
* @param usrId User ID.
* @param mdlId Model ID.
* @return New or existing conversation.
*/
def getConversation(usrId: Long, mdlId: String, parent: Span = null): NCConversation =
startScopedSpan("getConversation", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
val mdl = NCModelManager.getModel(mdlId).model
convs.synchronized {
val v = convs.getOrElseUpdate(
Key(usrId, mdlId),
Value(NCConversation(usrId, mdlId, mdl.getConversationTimeout, mdl.getConversationDepth))
)
v.tstamp = U.nowUtcMs()
convs.notifyAll()
v.conv
}
}
}