blob: caba0bba28160af864a0b6cefb2af6ef86c7eb9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nlpcraft.probe
import java.util.concurrent.CompletableFuture
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import com.typesafe.scalalogging.LazyLogging
import org.apache.nlpcraft.common.ascii.NCAsciiTable
import org.apache.nlpcraft.common.config.NCConfigurable
import org.apache.nlpcraft.common.nlp.core.NCNlpCoreManager
import org.apache.nlpcraft.common.nlp.dict.NCDictionaryManager
import org.apache.nlpcraft.common.nlp.numeric.NCNumericManager
import org.apache.nlpcraft.common.opencensus.NCOpenCensusTrace
import org.apache.nlpcraft.common.version.NCVersion
import org.apache.nlpcraft.common.{NCE, NCException, U}
import org.apache.nlpcraft.model.NCModel
import org.apache.nlpcraft.probe.mgrs.cmd.NCCommandManager
import org.apache.nlpcraft.probe.mgrs.conn.NCConnectionManager
import org.apache.nlpcraft.probe.mgrs.conversation.NCConversationManager
import org.apache.nlpcraft.probe.mgrs.deploy.NCDeployManager
import org.apache.nlpcraft.probe.mgrs.dialogflow.NCDialogFlowManager
import org.apache.nlpcraft.probe.mgrs.lifecycle.NCLifecycleManager
import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
import org.apache.nlpcraft.probe.mgrs.nlp.NCProbeEnrichmentManager
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.dictionary.NCDictionaryEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.limit.NCLimitEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.model.NCModelEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.post.NCPostEnrichProcessor
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.relation.NCRelationEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.sort.NCSortEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.stopword.NCStopWordEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.suspicious.NCSuspiciousNounsEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.validate.NCValidateManager
import scala.collection.JavaConverters._
import scala.compat.Platform.currentTime
import scala.util.control.Exception.{catching, ignoring}
/**
* Probe loader.
*/
private [probe] object NCProbeBoot extends LazyLogging with NCOpenCensusTrace {
private final val executionStart = System.currentTimeMillis()
@volatile private var started = false
@volatile private var shutdownHook: Thread = _
@volatile private var probeThread: Thread = _
// This container designed only for internal usage (transfer common data between methods).
private case class ProbeConfig(
var id: String,
var token: String,
var upLink: (String, Integer),
var downLink: (String, Integer),
var jarsFolder: Option[String],
var models: Seq[String],
var lifecycle: Seq[String]
) {
def upLinkString = s"${upLink._1}:${upLink._2}"
def downLinkString = s"${downLink._1}:${downLink._2}"
}
private def mkDefault(): Config = {
import ConfigValueFactory._
val prefix = "nlpcraft.probe"
ConfigFactory.empty().
withValue(s"$prefix.id", fromAnyRef("dflt.probe")).
// This is the default token as in default company.
// Note that this token must match the probe token from the company this probe
// associated with. If changed from default, this token must be kept secure.
withValue(s"$prefix.token", fromAnyRef("3141592653589793")).
withValue(s"$prefix.upLink", fromAnyRef("localhost:8201")).
withValue(s"$prefix.downLink", fromAnyRef("localhost:8202")).
withValue(s"$prefix.models", fromIterable(Seq().asJava)).
withValue(s"$prefix.lifecycle", fromIterable(Seq().asJava)).
withValue(s"$prefix.resultMaxSizeBytes", fromAnyRef(1048576)).
withValue("nlpcraft.nlpEngine", fromAnyRef("opennlp")).
withValue("nlpcraft.versionCheckDisable", fromAnyRef(false))
// Following properties are 'null' by default:
// -------------------------------------------
// nlpcraft.probe.modelFactory
// nlpcraft.probe.modelFactory
// nlpcraft.probe.jarsFolder
// nlpcraft.probe.modelFactory
// nlpcraft.probe.jaegerThriftUrl
// nlpcraft.probe.prometheusLink
// nlpcraft.probe.stackdriverGoogleProjectId
// nlpcraft.probe.zipkinV2Url
}
/**
*
* @param args
* @param overrideCfg
* @return
*/
@throws[NCE]
private def initializeConfig(args: Array[String], overrideCfg: Option[Config]): ProbeConfig = {
NCConfigurable.initialize(
overrideCfg,
args.find(_.startsWith("-config=")) match {
case Some(s)
val fileName = s.substring("-config=".length)
val f = new java.io.File(fileName)
if (!(f.exists && f.canRead && f.isFile))
throw new NCE(s"Specified probe configuration file does not exist or cannot be read: $fileName")
Some(fileName)
case NoneSome("nlpcraft.conf")
},
Some(mkDefault()),
(cfg: Config) ⇒ cfg.hasPath("nlpcraft.probe")
)
object Cfg extends NCConfigurable {
private final val prefix = "nlpcraft.probe"
val id: String = getString(s"$prefix.id")
val token: String = getString(s"$prefix.token")
val upLink: (String, Integer) = getHostPort(s"$prefix.upLink")
val downLink: (String, Integer) = getHostPort(s"$prefix.downLink")
val jarsFolder: Option[String] = getStringOpt(s"$prefix.jarsFolder")
val models: Seq[String] = getStringList(s"$prefix.models")
val lifecycle: Seq[String] = getStringList(s"$prefix.lifecycle")
}
ProbeConfig(
Cfg.id,
Cfg.token,
Cfg.upLink,
Cfg.downLink,
Cfg.jarsFolder,
Cfg.models,
Cfg.lifecycle
)
}
/**
*
* @param cfg Probe configuration.
* @param fut
*/
private def start0(cfg: ProbeConfig, fut: CompletableFuture[Void]): Unit = {
probeThread = Thread.currentThread()
asciiLogo()
ackConfig(cfg)
catching(classOf[Throwable]) either startManagers(cfg) match {
case Left(e)// Exception.
e match {
case x: NCException
logger.error(s"Failed to start probe.", x)
stopManagers()
logger.info("Managers stopped.")
case x: Throwable ⇒ logger.error("Failed to start probe due to unexpected error.", x)
}
fut.complete(null)
case _ ⇒ // Managers started OK.
shutdownHook = new Thread() {
override def run(): Unit = stop0()
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
ackStart()
started = true
fut.complete(null)
// Wait indefinitely.
while (started)
try
Thread.currentThread().join()
catch {
case _: InterruptedException()
}
}
logger.info("Embedded probe thread stopped OK.")
}
/**
*
*/
private def stop0(): Unit = {
ignoring(classOf[Throwable]) {
if (shutdownHook != null)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
stopManagers()
}
started = false
if (probeThread != null) {
probeThread.interrupt()
probeThread.join()
}
logger.info("Embedded probe shutdown OK.")
}
/**
*
*/
private def checkStarted(): Unit =
if (started)
throw new NCException(s"Embedded probe has already been started (only one probe per JVM is allowed).")
/**
*
* @param cfgFile
* @param fut
*/
private [probe] def start(cfgFile: String, fut: CompletableFuture[Void]): Unit = {
checkStarted()
val cfg = initializeConfig(Array(s"-config=$cfgFile"), None)
new Thread() {
override def run(): Unit = start0(cfg, fut)
}.start()
}
/**
*
* @param mdlClasses
* @param fut
*/
private [probe] def start(
mdlClasses: Array[java.lang.Class[_ <: NCModel]],
fut: CompletableFuture[Void]): Unit = {
checkStarted()
import ConfigValueFactory._
val cfg = initializeConfig(
Array.empty,
Some(
ConfigFactory.empty()
.withValue("nlpcraft.probe.models", fromIterable(mdlClasses.map(_.getName).toSeq.asJava))
)
)
new Thread() {
override def run(): Unit = start0(cfg, fut)
}.start()
}
/**
*
* @param probeId Probe ID.
* @param tok
* @param upLinkStr
* @param dnLinkStr
* @param mdlClasses
* @param fut
*/
private [probe] def start(
probeId: String,
tok: String,
upLinkStr: String,
dnLinkStr: String,
mdlClasses: Array[java.lang.Class[_ <: NCModel]],
fut: CompletableFuture[Void]): Unit = {
checkStarted()
object Cfg extends NCConfigurable {
private final val prefix = "nlpcraft.probe"
val id: String = probeId
val token: String = tok
val upLink: (String, Integer) = getHostPort(upLinkStr)
val dnLink: (String, Integer) = getHostPort(dnLinkStr)
val jarsFolder: Option[String] = getStringOpt(s"$prefix.jarsFolder")
val models: Seq[String] = mdlClasses.map(_.getName).toSeq
val lifecycle: Seq[String] = getStringList(s"$prefix.lifecycle")
}
new Thread() {
override def run(): Unit =
start0(ProbeConfig(
Cfg.id,
Cfg.token,
Cfg.upLink,
Cfg.dnLink,
Cfg.jarsFolder,
Cfg.models,
Cfg.lifecycle),
fut
)
}
.start()
}
/**
*
*/
private [probe] def stop(): Unit =
if (started)
stop0()
/**
*
* @param args
* @param fut
*/
private [probe] def start(args: Array[String], fut: CompletableFuture[Void]): Unit =
start0(initializeConfig(args, None), fut)
/**
* Prints ASCII-logo.
*/
private def asciiLogo() {
val NL = System getProperty "line.separator"
val ver = NCVersion.getCurrent
val s = NL +
raw" _ ____ ______ ______ $NL" +
raw" / | / / /___ / ____/________ _/ __/ /_ $NL" +
raw" / |/ / / __ \/ / / ___/ __ `/ /_/ __/ $NL" +
raw" / /| / / /_/ / /___/ / / /_/ / __/ /_ $NL" +
raw"/_/ |_/_/ .___/\____/_/ \__,_/_/ \__/ $NL" +
raw" /_/ $NL$NL" +
s"Embedded Data Probe$NL" +
s"Version: ${ver.version}$NL" +
raw"${NCVersion.copyright}$NL"
println(s)
}
/**
*
*/
private def ackConfig(cfg: ProbeConfig): Unit = {
val tbl = NCAsciiTable()
val ver = NCVersion.getCurrent
tbl += ("Probe ID", cfg.id)
tbl += ("Probe Token", cfg.token)
tbl += ("API Version", ver.version + ", " + ver.date.toString)
tbl += ("Down-Link", cfg.downLinkString)
tbl += ("Up-Link", cfg.upLinkString)
tbl += ("Models", cfg.models)
tbl += ("Lifecycle", cfg.lifecycle)
tbl += ("JARs Folder", cfg.jarsFolder.getOrElse(""))
tbl.info(logger, Some("Probe Configuration:"))
}
/**
* Asks server start.
*/
private def ackStart() {
val dur = s"[${U.format((currentTime - executionStart) / 1000.0, 2)} sec]"
val tbl = NCAsciiTable()
tbl.margin(top = 1)
tbl += s"Probe started $dur"
tbl.info(logger)
}
/**
*
* @return
*/
private def startManagers(cfg: ProbeConfig): Unit = {
// Lifecycle callback.
NCLifecycleManager.start()
NCLifecycleManager.onInit()
startScopedSpan("startManagers") { span ⇒
val ver = NCVersion.getCurrent
addTags(
span,
"id" → cfg.id,
"token" → cfg.token,
"uplink" → cfg.upLinkString,
"downlink" → cfg.downLinkString,
"relVer" → ver.version,
"relDate" → ver.date.toString,
"models" → cfg.models.mkString(","),
"lifecycle" → cfg.lifecycle.mkString(","),
"jarFolder" → cfg.jarsFolder
)
NCNlpCoreManager.start(span)
NCNumericManager.start(span)
NCDeployManager.start(span)
NCModelManager.start(span)
NCCommandManager.start(span)
NCDictionaryManager.start(span)
NCStopWordEnricher.start(span)
NCModelEnricher.start(span)
NCLimitEnricher.start(span)
NCSortEnricher.start(span)
NCRelationEnricher.start(span)
NCSuspiciousNounsEnricher.start(span)
NCValidateManager.start(span)
NCPostEnrichProcessor.start(span)
NCDictionaryEnricher.start(span)
NCConversationManager.start(span)
NCProbeEnrichmentManager.start(span)
NCConnectionManager.start(span)
NCDialogFlowManager.start(span)
}
}
/**
*
*/
private def stopManagers(): Unit = {
startScopedSpan("stopManagers") { span ⇒
// Order is important!
NCDialogFlowManager.stop(span)
NCConnectionManager.stop(span)
NCProbeEnrichmentManager.stop(span)
NCConversationManager.stop(span)
NCDictionaryEnricher.stop(span)
NCPostEnrichProcessor.stop(span)
NCValidateManager.stop(span)
NCSuspiciousNounsEnricher.stop(span)
NCRelationEnricher.stop(span)
NCSortEnricher.stop(span)
NCLimitEnricher.stop(span)
NCModelEnricher.stop(span)
NCStopWordEnricher.stop(span)
NCDictionaryManager.stop(span)
NCCommandManager.stop(span)
NCModelManager.stop(span)
NCDeployManager.stop(span)
NCNumericManager.stop(span)
NCNlpCoreManager.stop(span)
}
// Lifecycle callback.
NCLifecycleManager.onDiscard()
NCLifecycleManager.stop()
}
}