| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nlpcraft.probe |
| |
| import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} |
| import com.typesafe.scalalogging.LazyLogging |
| import io.opencensus.trace.Span |
| import org.apache.commons.lang3.SystemUtils |
| import org.apache.nlpcraft.common.ascii.NCAsciiTable |
| import org.apache.nlpcraft.common.config.NCConfigurable |
| import org.apache.nlpcraft.common.extcfg.NCExternalConfigManager |
| import org.apache.nlpcraft.common.module.NCModule |
| import org.apache.nlpcraft.common.nlp.core.NCNlpCoreManager |
| import org.apache.nlpcraft.common.nlp.dict.NCDictionaryManager |
| import org.apache.nlpcraft.common.nlp.numeric.NCNumericManager |
| import org.apache.nlpcraft.common.opencensus.NCOpenCensusTrace |
| import org.apache.nlpcraft.common.pool.NCThreadPoolManager |
| import org.apache.nlpcraft.common.version.NCVersion |
| import org.apache.nlpcraft.common.{NCE, NCService, U, _} |
| import org.apache.nlpcraft.model.tools.cmdline.NCCliProbeBeacon |
| import org.apache.nlpcraft.probe.mgrs.cmd.NCCommandManager |
| import org.apache.nlpcraft.probe.mgrs.conn.NCConnectionManager |
| import org.apache.nlpcraft.probe.mgrs.conversation.NCConversationManager |
| import org.apache.nlpcraft.probe.mgrs.deploy.NCDeployManager |
| import org.apache.nlpcraft.probe.mgrs.dialogflow.NCDialogFlowManager |
| import org.apache.nlpcraft.probe.mgrs.lifecycle.NCLifecycleManager |
| import org.apache.nlpcraft.probe.mgrs.model.NCModelManager |
| import org.apache.nlpcraft.probe.mgrs.nlp.NCProbeEnrichmentManager |
| import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.dictionary.NCDictionaryEnricher |
| import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.limit.NCLimitEnricher |
| import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.model.NCModelEnricher |
| import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.relation.NCRelationEnricher |
| import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.sort.NCSortEnricher |
| import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.stopword.NCStopWordEnricher |
| import org.apache.nlpcraft.probe.mgrs.nlp.enrichers.suspicious.NCSuspiciousNounsEnricher |
| import org.apache.nlpcraft.probe.mgrs.nlp.validate.NCValidateManager |
| import resource.managed |
| |
| import java.io._ |
| import java.util.concurrent.CompletableFuture |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable |
| import scala.compat.Platform.currentTime |
| import scala.util.control.Exception.{catching, ignoring} |
| |
| /** |
| * Probe loader. |
| */ |
| private [probe] object NCProbeBoot extends LazyLogging with NCOpenCensusTrace { |
| private final val BEACON_PATH = ".nlpcraft/probe_beacon" |
| |
| private final val execStart = System.currentTimeMillis() |
| |
| private val startedMgrs = mutable.Buffer.empty[NCService] |
| |
| @volatile private var started = false |
| @volatile private var shutdownHook: Thread = _ |
| @volatile private var probeThread: Thread = _ |
| |
| // This container designed only for internal usage (transfer common data between methods). |
| private case class ProbeConfig( |
| var id: String, |
| var token: String, |
| var upLink: (String, Integer), |
| var downLink: (String, Integer), |
| var jarsFolder: Option[String], |
| var models: String, |
| var lifecycle: Seq[String] |
| ) { |
| lazy val upLinkString = s"${upLink._1}:${upLink._2}" |
| lazy val downLinkString = s"${downLink._1}:${downLink._2}" |
| lazy val modelsSeq: Seq[String] = U.splitTrimFilter(models,",") |
| } |
| |
| private def mkDefault(): Config = { |
| import ConfigValueFactory._ |
| |
| val prefix = "nlpcraft.probe" |
| |
| ConfigFactory.empty(). |
| withValue(s"$prefix.id", fromAnyRef("dflt.probe")). |
| // This is the default token as in default company. |
| // Note that this token must match the probe token from the company this probe |
| // associated with. If changed from default, this token must be kept secure. |
| withValue(s"$prefix.token", fromAnyRef("3141592653589793")). |
| withValue(s"$prefix.upLink", fromAnyRef("localhost:8201")). |
| withValue(s"$prefix.downLink", fromAnyRef("localhost:8202")). |
| withValue(s"$prefix.models", fromAnyRef("")). |
| withValue(s"$prefix.lifecycle", fromIterable(Seq().asJava)). |
| withValue(s"$prefix.resultMaxSizeBytes", fromAnyRef(1048576)). |
| withValue("nlpcraft.nlpEngine", fromAnyRef("opennlp")) |
| |
| // Following properties are 'null' by default: |
| // ------------------------------------------- |
| // nlpcraft.probe.modelFactory |
| // nlpcraft.probe.modelFactory |
| // nlpcraft.probe.jarsFolder |
| // nlpcraft.probe.modelFactory |
| // nlpcraft.probe.jaegerThriftUrl |
| // nlpcraft.probe.prometheusLink |
| // nlpcraft.probe.stackdriverGoogleProjectId |
| // nlpcraft.probe.zipkinV2Url |
| } |
| |
| /** |
| * |
| * @param args |
| * @param overrideCfg |
| * @return |
| */ |
| @throws[NCE] |
| private def initializeConfig(args: Array[String], overrideCfg: Option[Config]): ProbeConfig = { |
| NCConfigurable.initialize( |
| overrideCfg, |
| args.find(_.startsWith("-config=")) match { |
| case Some(s) ⇒ |
| val fileName = s.substring("-config=".length) |
| |
| val f = new java.io.File(fileName) |
| |
| if (!(f.exists && f.canRead && f.isFile)) |
| throw new NCE(s"Specified probe configuration file does not exist or cannot be read: $fileName") |
| |
| Some(fileName) |
| |
| case None ⇒ Some("nlpcraft.conf") |
| }, |
| Some(mkDefault()), |
| (cfg: Config) ⇒ cfg.hasPath("nlpcraft.probe") |
| ) |
| |
| object Cfg extends NCConfigurable { |
| private final val prefix = "nlpcraft.probe" |
| |
| val id: String = getString(s"$prefix.id") |
| val token: String = getString(s"$prefix.token") |
| val upLink: (String, Integer) = getHostPort(s"$prefix.upLink") |
| val downLink: (String, Integer) = getHostPort(s"$prefix.downLink") |
| val jarsFolder: Option[String] = getStringOpt(s"$prefix.jarsFolder") |
| val models: String = getString(s"$prefix.models") |
| val lifecycle: Seq[String] = getStringList(s"$prefix.lifecycle") |
| } |
| |
| ProbeConfig( |
| Cfg.id, |
| Cfg.token, |
| Cfg.upLink, |
| Cfg.downLink, |
| Cfg.jarsFolder, |
| Cfg.models, |
| Cfg.lifecycle |
| ) |
| } |
| |
| /** |
| * |
| * @param args |
| * @param fut |
| */ |
| private [probe] def start(args: Array[String], fut: CompletableFuture[Integer]): Unit = { |
| checkStarted() |
| |
| val cfg = initializeConfig(args, None) |
| |
| new Thread() { |
| override def run(): Unit = start0(cfg, fut) |
| }.start() |
| } |
| |
| /** |
| * |
| * @param cfg Probe configuration. |
| * @param fut |
| */ |
| private def start0(cfg: ProbeConfig, fut: CompletableFuture[Integer]): Unit = { |
| NCModule.setModule(NCModule.PROBE) |
| |
| // Record an anonymous screenview. |
| new Thread() { |
| override def run(): Unit = U.gaScreenView("probe") |
| }.start() |
| |
| probeThread = Thread.currentThread() |
| |
| asciiLogo() |
| ackConfig(cfg) |
| |
| catching(classOf[Throwable]) either startManagers(cfg) match { |
| case Left(e) ⇒ // Exception. |
| U.prettyError(logger, "Failed to start probe:", e) |
| |
| stop0() |
| |
| fut.complete(1) |
| |
| case _ ⇒ // Managers started OK. |
| // Store beacon file once all managers started OK. |
| storeBeacon(cfg) |
| |
| shutdownHook = new Thread() { |
| override def run(): Unit = { |
| logger.info("Executing shutdown hook...") |
| |
| stop0() |
| } |
| } |
| |
| Runtime.getRuntime.addShutdownHook(shutdownHook) |
| |
| ackStart() |
| |
| started = true |
| |
| fut.complete(0) |
| |
| // Wait indefinitely. |
| while (started) |
| try |
| Thread.currentThread().join() |
| catch { |
| case _: InterruptedException ⇒ () |
| } |
| } |
| |
| logger.trace("Probe thread stopped OK.") |
| } |
| |
| /** |
| * |
| * @param cfg |
| */ |
| private def storeBeacon(cfg: ProbeConfig): Unit = { |
| val path = new File(SystemUtils.getUserHome, BEACON_PATH) |
| |
| /** |
| * |
| */ |
| def save(): Unit = { |
| try { |
| managed(new ObjectOutputStream(new FileOutputStream(path))) acquireAndGet { stream ⇒ |
| stream.writeObject(NCCliProbeBeacon( |
| pid = ProcessHandle.current().pid(), |
| id = cfg.id, |
| token = cfg.token, |
| upLink = s"${cfg.upLink._1}:${cfg.upLink._2}", |
| downLink = s"${cfg.downLink._1}:${cfg.downLink._2}", |
| jarsFolder = cfg.jarsFolder.orNull, |
| models = cfg.models, |
| beaconPath = path.getAbsolutePath, |
| startMs = currentTime |
| )) |
| |
| stream.flush() |
| } |
| |
| // Make sure beacon is deleted when server process exits. |
| path.deleteOnExit() |
| } |
| catch { |
| case e: IOException ⇒ U.prettyError(logger, "Failed to save probe beacon.", e) |
| } |
| } |
| |
| if (path.exists()) |
| catching(classOf[IOException]) either { |
| managed(new ObjectInputStream(new FileInputStream(path))) acquireAndGet { _.readObject() } |
| } match { |
| case Left(e) ⇒ |
| logger.trace(s"Failed to read existing probe beacon: ${path.getAbsolutePath}", e) |
| logger.trace(s"Overriding corrupted probe beacon: ${path.getAbsolutePath}") |
| |
| save() |
| |
| case Right(rawObj) ⇒ |
| val beacon = rawObj.asInstanceOf[NCCliProbeBeacon] |
| |
| if (ProcessHandle.of(beacon.pid).isPresent) |
| logger.warn(s"Another local probe detected (safely ignored) [id=${beacon.id}, pid=${beacon.pid}]") |
| else { // Leave the existing probe beacon as is... |
| logger.trace(s"Overriding probe beacon for a phantom process [pid=${beacon.pid}]") |
| |
| save() |
| } |
| } |
| else |
| // No existing beacon file detected. |
| save() |
| } |
| |
| /** |
| * |
| */ |
| private def stop0(): Unit = { |
| ignoring(classOf[Throwable]) { |
| stopManagers() |
| } |
| |
| started = false |
| |
| U.interruptThread(probeThread) |
| |
| logger.info("Probe shutdown OK.") |
| } |
| |
| /** |
| * |
| */ |
| private def checkStarted(): Unit = |
| if (started) |
| throw new NCE(s"Probe has already been started (only one probe per JVM is allowed).") |
| |
| /** |
| * Starts the embedded probe with optional configuration file and provided overrides. |
| * |
| * @param cfgFile Optional configuration file to use. If `null` - the default configuration will be used. |
| * @param mdlClasses Optional overrides for 'nlpcraft.probe.models' configuration property. If `null` - |
| * the models configured in the configuration (default or provided) will be used. |
| * @param fut |
| */ |
| private [probe] def startEmbedded( |
| cfgFile: String, |
| mdlClasses: java.util.Collection[String], |
| fut: CompletableFuture[Integer]): Unit = { |
| checkStarted() |
| |
| import ConfigValueFactory._ |
| |
| val cfg = initializeConfig( |
| if (cfgFile == null) Array.empty else Array(s"-config=$cfgFile"), |
| if (mdlClasses == null) |
| None |
| else |
| Some( |
| ConfigFactory.empty().withValue( |
| "nlpcraft.probe.models", |
| fromAnyRef(mdlClasses.asScala.mkString(",")) |
| ) |
| ) |
| ) |
| |
| new Thread() { |
| override def run(): Unit = start0(cfg, fut) |
| }.start() |
| } |
| |
| /** |
| * Starts the embedded probe with specified configuration values. |
| * |
| * @param probeId Probe ID. |
| * @param tok |
| * @param upLinkStr |
| * @param dnLinkStr |
| * @param mdlClasses |
| * @param fut |
| */ |
| private [probe] def startEmbedded( |
| probeId: String, |
| tok: String, |
| upLinkStr: String, |
| dnLinkStr: String, |
| mdlClasses: Array[String], |
| fut: CompletableFuture[Integer]): Unit = { |
| checkStarted() |
| |
| object Cfg extends NCConfigurable { |
| private final val prefix = "nlpcraft.probe" |
| |
| val id: String = probeId |
| val token: String = tok |
| val upLink: (String, Integer) = getHostPort(upLinkStr) |
| val dnLink: (String, Integer) = getHostPort(dnLinkStr) |
| val jarsFolder: Option[String] = getStringOpt(s"$prefix.jarsFolder") |
| val models: String = mdlClasses.mkString(",") |
| val lifecycle: Seq[String] = getStringList(s"$prefix.lifecycle") |
| } |
| |
| new Thread() { |
| override def run(): Unit = |
| start0(ProbeConfig( |
| Cfg.id, |
| Cfg.token, |
| Cfg.upLink, |
| Cfg.dnLink, |
| Cfg.jarsFolder, |
| Cfg.models, |
| Cfg.lifecycle), |
| fut |
| ) |
| } |
| .start() |
| } |
| |
| /** |
| * |
| */ |
| private [probe] def stop(): Unit = { |
| if (shutdownHook != null) |
| Runtime.getRuntime.removeShutdownHook(shutdownHook) |
| |
| if (started) |
| stop0() |
| } |
| |
| /** |
| * Prints ASCII-logo. |
| */ |
| private def asciiLogo() { |
| val ver = NCVersion.getCurrent |
| |
| println( |
| U.NL + |
| U.asciiLogo() + |
| s"${U.NL}" + |
| s"Embedded Data Probe${U.NL}" + |
| s"Version: ${bo(ver.version)}${U.NL}" + |
| s"${NCVersion.copyright}${U.NL}" |
| ) |
| } |
| |
| /** |
| * |
| */ |
| private def ackConfig(cfg: ProbeConfig): Unit = { |
| val tbl = NCAsciiTable() |
| |
| val ver = NCVersion.getCurrent |
| |
| tbl += (s"${B}Probe ID$RST", s"$BO${cfg.id}$RST") |
| tbl += (s"${B}Probe Token$RST", cfg.token) |
| tbl += (s"${B}API Version$RST", ver.version + ", " + ver.date.toString) |
| tbl += (s"${B}Down-Link$RST", cfg.downLinkString) |
| tbl += (s"${B}Up-Link$RST", cfg.upLinkString) |
| tbl += (s"${B}Lifecycle$RST", cfg.lifecycle) |
| tbl += (s"${B}Models (${cfg.modelsSeq.size})$RST", cfg.modelsSeq) |
| tbl += (s"${B}JARs Folder$RST", cfg.jarsFolder.getOrElse("")) |
| |
| tbl.info(logger, Some("Probe Configuration:")) |
| } |
| |
| /** |
| * Asks server start. |
| */ |
| private def ackStart() { |
| val dur = s"[${U.format((currentTime - execStart) / 1000.0, 2)} sec]" |
| |
| val tbl = NCAsciiTable() |
| |
| tbl.margin(bottom = 1, top = 1) |
| |
| tbl += s"Probe started ${b(dur)}" |
| |
| tbl.info(logger) |
| } |
| |
| /** |
| * |
| * @return |
| */ |
| private def startManagers(cfg: ProbeConfig): Unit = { |
| // Lifecycle callback outside of tracing span. |
| NCLifecycleManager.start() |
| NCLifecycleManager.onInit() |
| |
| startScopedSpan("startManagers") { span ⇒ |
| val ver = NCVersion.getCurrent |
| |
| addTags( |
| span, |
| "id" → cfg.id, |
| "token" → cfg.token, |
| "uplink" → cfg.upLinkString, |
| "downlink" → cfg.downLinkString, |
| "relVer" → ver.version, |
| "relDate" → ver.date.toString, |
| "models" → cfg.models, |
| "lifecycle" → cfg.lifecycle.mkString(","), |
| "jarFolder" → cfg.jarsFolder |
| ) |
| |
| startedMgrs += NCThreadPoolManager.start(span) |
| startedMgrs += NCExternalConfigManager.start(span) |
| startedMgrs += NCNlpCoreManager.start(span) |
| startedMgrs += NCNumericManager.start(span) |
| startedMgrs += NCDeployManager.start(span) |
| startedMgrs += NCModelManager.start(span) |
| startedMgrs += NCCommandManager.start(span) |
| startedMgrs += NCDictionaryManager.start(span) |
| startedMgrs += NCStopWordEnricher.start(span) |
| startedMgrs += NCModelEnricher.start(span) |
| startedMgrs += NCLimitEnricher.start(span) |
| startedMgrs += NCSortEnricher.start(span) |
| startedMgrs += NCRelationEnricher.start(span) |
| startedMgrs += NCSuspiciousNounsEnricher.start(span) |
| startedMgrs += NCValidateManager.start(span) |
| startedMgrs += NCDictionaryEnricher.start(span) |
| startedMgrs += NCConversationManager.start(span) |
| startedMgrs += NCProbeEnrichmentManager.start(span) |
| startedMgrs += NCConnectionManager.start(span) |
| startedMgrs += NCDialogFlowManager.start(span) |
| } |
| } |
| |
| private def stopManager(mgr: NCService, span: Span): Unit = |
| try |
| mgr.stop(span) |
| catch { |
| case e: Throwable ⇒ U.prettyError(logger, s"Failed to stop manager: ${mgr.name}", e) |
| } |
| |
| /** |
| * |
| */ |
| private def stopManagers(): Unit = { |
| startScopedSpan("stopManagers") { span ⇒ |
| startedMgrs.synchronized { |
| try |
| startedMgrs.reverseIterator.foreach(stopManager(_, span)) |
| finally |
| startedMgrs.clear() |
| } |
| |
| // Lifecycle callback outside of tracing span. |
| NCLifecycleManager.onDiscard() |
| stopManager(NCLifecycleManager, span) |
| } |
| } |
| } |