blob: 3051158a1cfa2f41c993a578aa96e65bf57dc73f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nlpcraft.probe
import java.util.concurrent.CompletableFuture
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import com.typesafe.scalalogging.LazyLogging
import io.opencensus.trace.Span
import org.apache.nlpcraft.common.ascii.NCAsciiTable
import org.apache.nlpcraft.common.config.NCConfigurable
import org.apache.nlpcraft.common.nlp.core.NCNlpCoreManager
import org.apache.nlpcraft.common.nlp.dict.NCDictionaryManager
import org.apache.nlpcraft.common.nlp.numeric.NCNumericManager
import org.apache.nlpcraft.common.opencensus.NCOpenCensusTrace
import org.apache.nlpcraft.common.extcfg.NCExternalConfigManager
import org.apache.nlpcraft.common.version.NCVersion
import org.apache.nlpcraft.common.{NCE, NCException, NCService, U}
import org.apache.nlpcraft.model.NCModel
import org.apache.nlpcraft.probe.mgrs.cmd.NCCommandManager
import org.apache.nlpcraft.probe.mgrs.conn.NCConnectionManager
import org.apache.nlpcraft.probe.mgrs.conversation.NCConversationManager
import org.apache.nlpcraft.probe.mgrs.deploy.NCDeployManager
import org.apache.nlpcraft.probe.mgrs.dialogflow.NCDialogFlowManager
import org.apache.nlpcraft.probe.mgrs.lifecycle.NCLifecycleManager
import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
import org.apache.nlpcraft.probe.mgrs.nlp.NCProbeEnrichmentManager
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.dictionary.NCDictionaryEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.limit.NCLimitEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.model.NCModelEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.relation.NCRelationEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.sort.NCSortEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.stopword.NCStopWordEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.suspicious.NCSuspiciousNounsEnricher
import org.apache.nlpcraft.probe.mgrs.nlp.validate.NCValidateManager
import org.apache.nlpcraft.common.ansi.NCAnsiColor._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.compat.Platform.currentTime
import scala.util.control.Exception.{catching, ignoring}
/**
* Probe loader.
*/
private [probe] object NCProbeBoot extends LazyLogging with NCOpenCensusTrace {
private final val executionStart = System.currentTimeMillis()
private val startedMgrs = mutable.Buffer.empty[NCService]
@volatile private var started = false
@volatile private var shutdownHook: Thread = _
@volatile private var probeThread: Thread = _
// This container designed only for internal usage (transfer common data between methods).
private case class ProbeConfig(
var id: String,
var token: String,
var upLink: (String, Integer),
var downLink: (String, Integer),
var jarsFolder: Option[String],
var models: String,
var lifecycle: Seq[String]
) {
lazy val upLinkString = s"${upLink._1}:${upLink._2}"
lazy val downLinkString = s"${downLink._1}:${downLink._2}"
lazy val modelsSeq: Seq[String] = models.split(",").map(_.trim)
}
private def mkDefault(): Config = {
import ConfigValueFactory._
val prefix = "nlpcraft.probe"
ConfigFactory.empty().
withValue(s"$prefix.id", fromAnyRef("dflt.probe")).
// This is the default token as in default company.
// Note that this token must match the probe token from the company this probe
// associated with. If changed from default, this token must be kept secure.
withValue(s"$prefix.token", fromAnyRef("3141592653589793")).
withValue(s"$prefix.upLink", fromAnyRef("localhost:8201")).
withValue(s"$prefix.downLink", fromAnyRef("localhost:8202")).
withValue(s"$prefix.model", fromAnyRef(null)).
withValue(s"$prefix.models", fromAnyRef("")).
withValue(s"$prefix.lifecycle", fromIterable(Seq().asJava)).
withValue(s"$prefix.resultMaxSizeBytes", fromAnyRef(1048576)).
withValue("nlpcraft.nlpEngine", fromAnyRef("opennlp"))
// Following properties are 'null' by default:
// -------------------------------------------
// nlpcraft.probe.modelFactory
// nlpcraft.probe.modelFactory
// nlpcraft.probe.jarsFolder
// nlpcraft.probe.modelFactory
// nlpcraft.probe.jaegerThriftUrl
// nlpcraft.probe.prometheusLink
// nlpcraft.probe.stackdriverGoogleProjectId
// nlpcraft.probe.zipkinV2Url
}
/**
*
* @param args
* @param overrideCfg
* @return
*/
@throws[NCE]
private def initializeConfig(args: Array[String], overrideCfg: Option[Config]): ProbeConfig = {
NCConfigurable.initialize(
overrideCfg,
args.find(_.startsWith("-config=")) match {
case Some(s)
val fileName = s.substring("-config=".length)
val f = new java.io.File(fileName)
if (!(f.exists && f.canRead && f.isFile))
throw new NCE(s"Specified probe configuration file does not exist or cannot be read: $fileName")
Some(fileName)
case NoneSome("nlpcraft.conf")
},
Some(mkDefault()),
(cfg: Config) ⇒ cfg.hasPath("nlpcraft.probe")
)
object Cfg extends NCConfigurable {
private final val prefix = "nlpcraft.probe"
val id: String = getString(s"$prefix.id")
val token: String = getString(s"$prefix.token")
val upLink: (String, Integer) = getHostPort(s"$prefix.upLink")
val downLink: (String, Integer) = getHostPort(s"$prefix.downLink")
val jarsFolder: Option[String] = getStringOpt(s"$prefix.jarsFolder")
val models: String = getString(s"$prefix.models")
val lifecycle: Seq[String] = getStringList(s"$prefix.lifecycle")
}
ProbeConfig(
Cfg.id,
Cfg.token,
Cfg.upLink,
Cfg.downLink,
Cfg.jarsFolder,
Cfg.models,
Cfg.lifecycle
)
}
/**
*
* @param cfg Probe configuration.
* @param fut
*/
private def start0(cfg: ProbeConfig, fut: CompletableFuture[Integer]): Unit = {
probeThread = Thread.currentThread()
asciiLogo()
ackConfig(cfg)
catching(classOf[Throwable]) either startManagers(cfg) match {
case Left(e)// Exception.
U.prettyError(logger, "Failed to start probe:", e)
stop0()
fut.complete(1)
case _ ⇒ // Managers started OK.
shutdownHook = new Thread() {
override def run(): Unit = {
logger.info("Executing shutdown hook...")
stop0()
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
ackStart()
started = true
fut.complete(0)
// Wait indefinitely.
while (started)
try
Thread.currentThread().join()
catch {
case _: InterruptedException()
}
}
logger.trace("Probe thread stopped OK.")
}
/**
*
*/
private def stop0(): Unit = {
ignoring(classOf[Throwable]) {
if (shutdownHook != null)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
stopManagers()
}
started = false
U.interruptThread(probeThread)
logger.info("Probe shutdown OK.")
}
/**
*
*/
private def checkStarted(): Unit =
if (started)
throw new NCException(s"Probe has already been started (only one probe per JVM is allowed).")
/**
*
* @param cfgFile
* @param fut
*/
private [probe] def start(cfgFile: String, fut: CompletableFuture[Integer]): Unit = {
checkStarted()
val cfg = initializeConfig(Array(s"-config=$cfgFile"), None)
new Thread() {
override def run(): Unit = start0(cfg, fut)
}.start()
}
/**
*
* @param mdlClasses
* @param fut
*/
private [probe] def start(
mdlClasses: Array[java.lang.Class[_ <: NCModel]],
fut: CompletableFuture[Integer]): Unit = {
checkStarted()
import ConfigValueFactory._
val cfg = initializeConfig(
Array.empty,
Some(
ConfigFactory.empty().withValue(
"nlpcraft.probe.models",
fromAnyRef(mdlClasses.map(_.getName).mkString(","))
)
)
)
new Thread() {
override def run(): Unit = start0(cfg, fut)
}.start()
}
/**
*
* @param probeId Probe ID.
* @param tok
* @param upLinkStr
* @param dnLinkStr
* @param mdlClasses
* @param fut
*/
private [probe] def start(
probeId: String,
tok: String,
upLinkStr: String,
dnLinkStr: String,
mdlClasses: Array[java.lang.Class[_ <: NCModel]],
fut: CompletableFuture[Integer]): Unit = {
checkStarted()
object Cfg extends NCConfigurable {
private final val prefix = "nlpcraft.probe"
val id: String = probeId
val token: String = tok
val upLink: (String, Integer) = getHostPort(upLinkStr)
val dnLink: (String, Integer) = getHostPort(dnLinkStr)
val jarsFolder: Option[String] = getStringOpt(s"$prefix.jarsFolder")
val models: String = mdlClasses.map(_.getName).mkString(",")
val lifecycle: Seq[String] = getStringList(s"$prefix.lifecycle")
}
new Thread() {
override def run(): Unit =
start0(ProbeConfig(
Cfg.id,
Cfg.token,
Cfg.upLink,
Cfg.dnLink,
Cfg.jarsFolder,
Cfg.models,
Cfg.lifecycle),
fut
)
}
.start()
}
/**
*
*/
private [probe] def stop(): Unit =
if (started)
stop0()
/**
*
* @param args
* @param fut
*/
private [probe] def start(args: Array[String], fut: CompletableFuture[Integer]): Unit =
start0(initializeConfig(args, None), fut)
/**
* Prints ASCII-logo.
*/
private def asciiLogo() {
val ver = NCVersion.getCurrent
println(
U.NL +
U.asciiLogo() +
s"${U.NL}" +
s"Embedded Data Probe${U.NL}" +
s"Version: $ansiBold${ver.version}${U.NL}$ansiReset" +
s"${NCVersion.copyright}${U.NL}"
)
}
/**
*
*/
private def ackConfig(cfg: ProbeConfig): Unit = {
val tbl = NCAsciiTable()
val ver = NCVersion.getCurrent
tbl += (s"${ansiBlueFg}Probe ID$ansiReset", s"$ansiBold${cfg.id}$ansiReset")
tbl += (s"${ansiBlueFg}Probe Token$ansiReset", cfg.token)
tbl += (s"${ansiBlueFg}API Version$ansiReset", ver.version + ", " + ver.date.toString)
tbl += (s"${ansiBlueFg}Down-Link$ansiReset", cfg.downLinkString)
tbl += (s"${ansiBlueFg}Up-Link$ansiReset", cfg.upLinkString)
tbl += (s"${ansiBlueFg}Lifecycle$ansiReset", cfg.lifecycle)
tbl += (s"${ansiBlueFg}Models (${cfg.modelsSeq.size})$ansiReset" , cfg.modelsSeq)
tbl += (s"${ansiBlueFg}JARs Folder$ansiReset", cfg.jarsFolder.getOrElse(""))
tbl.info(logger, Some("Probe Configuration:"))
}
/**
* Asks server start.
*/
private def ackStart() {
val dur = s"[${U.format((currentTime - executionStart) / 1000.0, 2)} sec]"
val tbl = NCAsciiTable()
tbl.margin(top = 1)
tbl += s"Probe started $ansiBlueFg$dur$ansiReset"
tbl.info(logger)
}
/**
*
* @return
*/
private def startManagers(cfg: ProbeConfig): Unit = {
// Lifecycle callback outside of tracing span.
NCLifecycleManager.start()
NCLifecycleManager.onInit()
startScopedSpan("startManagers") { span ⇒
val ver = NCVersion.getCurrent
addTags(
span,
"id" → cfg.id,
"token" → cfg.token,
"uplink" → cfg.upLinkString,
"downlink" → cfg.downLinkString,
"relVer" → ver.version,
"relDate" → ver.date.toString,
"models" → cfg.models,
"lifecycle" → cfg.lifecycle.mkString(","),
"jarFolder" → cfg.jarsFolder
)
startedMgrs += NCExternalConfigManager.start(span)
startedMgrs += NCNlpCoreManager.start(span)
startedMgrs += NCNumericManager.start(span)
startedMgrs += NCDeployManager.start(span)
startedMgrs += NCModelManager.start(span)
startedMgrs += NCCommandManager.start(span)
startedMgrs += NCDictionaryManager.start(span)
startedMgrs += NCStopWordEnricher.start(span)
startedMgrs += NCModelEnricher.start(span)
startedMgrs += NCLimitEnricher.start(span)
startedMgrs += NCSortEnricher.start(span)
startedMgrs += NCRelationEnricher.start(span)
startedMgrs += NCSuspiciousNounsEnricher.start(span)
startedMgrs += NCValidateManager.start(span)
startedMgrs += NCDictionaryEnricher.start(span)
startedMgrs += NCConversationManager.start(span)
startedMgrs += NCProbeEnrichmentManager.start(span)
startedMgrs += NCConnectionManager.start(span)
startedMgrs += NCDialogFlowManager.start(span)
}
}
private def stopManager(mgr: NCService, span: Span): Unit =
try
mgr.stop(span)
catch {
case e: Throwable ⇒ U.prettyError(logger, s"Failed to stop manager: ${mgr.name}", e)
}
/**
*
*/
private def stopManagers(): Unit = {
startScopedSpan("stopManagers") { span ⇒
startedMgrs.synchronized {
try
startedMgrs.reverseIterator.foreach(stopManager(_, span))
finally
startedMgrs.clear()
}
// Lifecycle callback outside of tracing span.
NCLifecycleManager.onDiscard()
stopManager(NCLifecycleManager, span)
}
}
}