blob: f3db5720d9580473069170717871278b07f853fa [file] [log] [blame]
/*
* Copyright 2014 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.spark.boot
import akka.actor.{ActorRef, ActorSystem}
import com.ibm.spark.boot.layer._
import com.ibm.spark.interpreter.Interpreter
import com.ibm.spark.kernel.protocol.v5.KernelStatusType._
import com.ibm.spark.kernel.protocol.v5._
import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
import com.ibm.spark.security.KernelSecurityManager
import com.ibm.spark.utils.LogLike
import com.typesafe.config.Config
import org.apache.spark.SparkContext
class KernelBootstrap(config: Config) extends LogLike {
this: BareInitialization with ComponentInitialization
with HandlerInitialization with HookInitialization =>
private val DefaultAppName = SparkKernelInfo.banner
private val DefaultActorSystemName = "spark-kernel-actor-system"
private var actorSystem: ActorSystem = _
private var actorLoader: ActorLoader = _
private var kernelMessageRelayActor: ActorRef = _
private var statusDispatch: ActorRef = _
private var sparkContext: SparkContext = _
private var interpreters: Seq[Interpreter] = Nil
/**
* Initializes all kernel systems.
*/
def initialize() = {
// TODO: Investigate potential to initialize System out/err/in to capture
// Console DynamicVariable initialization (since takes System fields)
// and redirect it to a workable location (like an actor) with the
// thread's current information attached
//
// E.G. System.setOut(customPrintStream) ... all new threads will have
// customPrintStream as their initial Console.out value
//
displayVersionInfo()
// Initialize the bare minimum to report a starting message
val (actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch) =
initializeBare(
config = config,
actorSystemName = DefaultActorSystemName
)
this.actorSystem = actorSystem
this.actorLoader = actorLoader
this.kernelMessageRelayActor = kernelMessageRelayActor
this.statusDispatch = statusDispatch
// Indicate that the kernel is now starting
publishStatus(KernelStatusType.Starting)
// Initialize components needed elsewhere
val (commStorage, commRegistrar, commManager, interpreter,
kernel, sparkContext, dependencyDownloader,
magicLoader, responseMap) =
initializeComponents(
config = config,
appName = DefaultAppName,
actorLoader = actorLoader
)
this.sparkContext = sparkContext
this.interpreters ++= Seq(interpreter)
// Initialize our handlers that take care of processing messages
initializeHandlers(
actorSystem = actorSystem,
actorLoader = actorLoader,
interpreter = interpreter,
commRegistrar = commRegistrar,
commStorage = commStorage,
magicLoader = magicLoader,
responseMap = responseMap
)
// Initialize our hooks that handle various JVM events
initializeHooks(
interpreter = interpreter
)
logger.debug("Initializing security manager")
System.setSecurityManager(new KernelSecurityManager)
logger.info("Marking relay as ready for receiving messages")
kernelMessageRelayActor ! true
this
}
/**
* Shuts down all kernel systems.
*/
def shutdown() = {
logger.info("Shutting down Spark Context")
sparkContext.stop()
logger.info("Shutting down interpreters")
interpreters.foreach(_.stop())
logger.info("Shutting down actor system")
actorSystem.shutdown()
this
}
/**
* Waits for the main actor system to terminate.
*/
def waitForTermination() = {
logger.debug("Waiting for actor system to terminate")
actorSystem.awaitTermination()
this
}
private def publishStatus(
status: KernelStatusType,
parentHeader: Option[ParentHeader] = None
): Unit = {
parentHeader match {
case Some(header) => statusDispatch ! ((status, header))
case None => statusDispatch ! status
}
}
@inline private def displayVersionInfo() = {
logger.info("Kernel version: " + SparkKernelInfo.implementationVersion)
logger.info("Scala version: " + SparkKernelInfo.languageVersion)
}
}