blob: c8ffc5c5c085a2d7709b2c2805e6abd10e1763e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nlpcraft.probe.mgrs.cmd
import java.io.Serializable
import java.util.concurrent.{ExecutorService, Executors}
import com.google.gson.Gson
import io.opencensus.trace.Span
import org.apache.nlpcraft.common.nlp.NCNlpSentence
import org.apache.nlpcraft.common.util.NCUtils
import org.apache.nlpcraft.common.NCService
import org.apache.nlpcraft.model.NCToken
import org.apache.nlpcraft.probe.mgrs.NCProbeMessage
import org.apache.nlpcraft.probe.mgrs.conn.NCConnectionManager
import org.apache.nlpcraft.probe.mgrs.conversation.NCConversationManager
import org.apache.nlpcraft.probe.mgrs.dialogflow.NCDialogFlowManager
import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
import org.apache.nlpcraft.probe.mgrs.nlp.NCProbeEnrichmentManager
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
/**
* Probe commands processor.
*/
object NCCommandManager extends NCService {
private final val GSON = new Gson()
@volatile private var pool: ExecutorService = _
@volatile private var executor: ExecutionContextExecutor = _
override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
pool = Executors.newCachedThreadPool()
executor = ExecutionContext.fromExecutor(pool)
super.start()
}
override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
super.stop()
NCUtils.shutdownPools(pool)
executor = null
pool = null
}
/**
*
* @param msg Server message to process.
* @param parent Optional parent span.
*/
def processServerMessage(msg: NCProbeMessage, parent: Span = null): Unit =
startScopedSpan("processServerMessage", parent,
"msgType" → msg.getType,
"srvReqId" → msg.dataOpt[String]("srvReqId").getOrElse(""),
"usrId" → msg.dataOpt[Long]("userId").getOrElse(-1),
"mdlId" → msg.dataOpt[String]("mdlId").getOrElse("")) { span ⇒
if (msg.getType != "S2P_PING")
logger.trace(s"Probe server message received: $msg")
try
msg.getType match {
case "S2P_PING"()
case "S2P_CLEAR_CONV"
NCConversationManager.getConversation(
msg.data[Long]("usrId"),
msg.data[String]("mdlId"),
span
).clearTokens((_: NCToken)true)
case "S2P_CLEAR_DLG"
NCDialogFlowManager.clear(
msg.data[Long]("usrId"),
msg.data[String]("mdlId"),
span
)
case "S2P_ASK"
NCProbeEnrichmentManager.ask(
srvReqId = msg.data[String]("srvReqId"),
txt = msg.data[String]("txt"),
nlpSens = msg.data[java.util.List[NCNlpSentence]]("nlpSens").asScala,
usrId = msg.data[Long]("userId"),
senMeta = msg.data[java.util.Map[String, Serializable]]("senMeta").asScala,
mdlId = msg.data[String]("mdlId"),
logEnable = msg.data[Boolean]("logEnable"),
span
)
case "S2P_MODEL_INFO"
val mdlId = msg.data[String]("mdlId")
val mdlData = NCModelManager.getModel(mdlId)
val macros = mdlData.model.getMacros.asInstanceOf[Serializable]
val syns = mdlData.model.getElements.asScala.map(p ⇒ p.getId → p.getSynonyms).toMap.asJava.asInstanceOf[Serializable]
val samples = mdlData.samples.map(p ⇒ p._1 → p._2.asJava).asJava.asInstanceOf[Serializable]
NCConnectionManager.send(
NCProbeMessage(
"P2S_MODEL_INFO",
"reqGuid" → msg.getGuid,
"resp" → GSON.toJson(
Map(
"macros" → macros,
"synonyms" → syns,
"samples" → samples
).asJava
)
),
span
)
case _ ⇒
logger.error(s"Received unknown server message (you need to update the probe): ${msg.getType}")
}
catch {
case e: Throwable ⇒ logger.error(s"Error while processing server message (ignoring): ${msg.getType}", e)
}
}
}