blob: 9011193fcc30f4506df34c2554afd0504c7eda56 [file] [log] [blame]
/*
* Copyright 2014 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.spark.boot.layer
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorRef
import com.ibm.spark.comm.{CommManager, KernelCommManager, CommRegistrar, CommStorage}
import com.ibm.spark.dependencies.{DependencyDownloader, IvyDependencyDownloader}
import com.ibm.spark.global
import com.ibm.spark.interpreter._
import com.ibm.spark.kernel.api.Kernel
import com.ibm.spark.kernel.protocol.v5.KMBuilder
import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
import com.ibm.spark.kernel.protocol.v5.stream.KernelOutputStream
import com.ibm.spark.magic.MagicLoader
import com.ibm.spark.magic.builtin.BuiltinLoader
import com.ibm.spark.magic.dependencies.DependencyMap
import com.ibm.spark.utils.{TaskManager, KeyValuePairUtils, LogLike}
import com.typesafe.config.Config
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.JavaConverters._
import scala.util.Try
/**
* Represents the component initialization. All component-related pieces of the
* kernel (non-actors) should be created here. Limited items should be exposed.
*/
trait ComponentInitialization {
/**
* Initializes and registers all components (not needed by bare init).
*
* @param config The config used for initialization
* @param appName The name of the "application" for Spark
* @param actorLoader The actor loader to use for some initialization
*/
def initializeComponents(
config: Config, appName: String, actorLoader: ActorLoader
): (CommStorage, CommRegistrar, CommManager, Interpreter,
Kernel, SparkContext, DependencyDownloader, MagicLoader,
collection.mutable.Map[String, ActorRef])
}
/**
* Represents the standard implementation of ComponentInitialization.
*/
trait StandardComponentInitialization extends ComponentInitialization {
this: LogLike =>
/**
* Initializes and registers all components (not needed by bare init).
*
* @param config The config used for initialization
* @param appName The name of the "application" for Spark
* @param actorLoader The actor loader to use for some initialization
*/
def initializeComponents(
config: Config, appName: String, actorLoader: ActorLoader
) = {
val (commStorage, commRegistrar, commManager) =
initializeCommObjects(actorLoader)
val interpreter = initializeInterpreter(config)
val sparkContext = initializeSparkContext(
config, appName, actorLoader, interpreter)
val dependencyDownloader = initializeDependencyDownloader(config)
val magicLoader = initializeMagicLoader(
config, interpreter, sparkContext, dependencyDownloader)
val kernel = initializeKernel(
actorLoader, interpreter, commManager, magicLoader)
val responseMap = initializeResponseMap()
(commStorage, commRegistrar, commManager, interpreter, kernel,
sparkContext, dependencyDownloader, magicLoader, responseMap)
}
private def initializeCommObjects(actorLoader: ActorLoader) = {
logger.debug("Constructing Comm storage")
val commStorage = new CommStorage()
logger.debug("Constructing Comm registrar")
val commRegistrar = new CommRegistrar(commStorage)
logger.debug("Constructing Comm manager")
val commManager = new KernelCommManager(
actorLoader, KMBuilder(), commRegistrar)
(commStorage, commRegistrar, commManager)
}
private def initializeDependencyDownloader(config: Config) = {
val dependencyDownloader = new IvyDependencyDownloader(
"http://repo1.maven.org/maven2/", config.getString("ivy_local")
)
dependencyDownloader
}
protected def initializeInterpreter(config: Config) = {
val interpreterArgs = config.getStringList("interpreter_args").asScala.toList
val maxInterpreterThreads = config.getInt("max_interpreter_threads")
logger.info(
s"Constructing interpreter with $maxInterpreterThreads threads and " +
"with arguments: " + interpreterArgs.mkString(" "))
val interpreter = new ScalaInterpreter(interpreterArgs, Console.out)
with StandardSparkIMainProducer
with TaskManagerProducerLike
with StandardSettingsProducer {
override def newTaskManager(): TaskManager =
new TaskManager(maximumWorkers = maxInterpreterThreads)
}
logger.debug("Starting interpreter")
interpreter.start()
interpreter
}
// TODO: Think of a better way to test without exposing this
protected[layer] def initializeSparkContext(
config: Config, appName: String, actorLoader: ActorLoader,
interpreter: Interpreter
) = {
logger.debug("Creating Spark Configuration")
val conf = new SparkConf()
val master = config.getString("spark.master")
logger.info("Using " + master + " as Spark Master")
conf.setMaster(master)
logger.info("Using " + appName + " as Spark application name")
conf.setAppName(appName)
KeyValuePairUtils.stringToKeyValuePairSeq(
config.getString("spark_configuration")
).foreach { keyValuePair =>
logger.info(s"Setting ${keyValuePair.key} to ${keyValuePair.value}")
Try(conf.set(keyValuePair.key, keyValuePair.value))
}
// TODO: Move SparkIMain to private and insert in a different way
logger.warn("Locked to Scala interpreter with SparkIMain until decoupled!")
// TODO: Construct class server outside of SparkIMain
logger.warn("Unable to control initialization of REPL class server!")
logger.info("REPL Class Server Uri: " + interpreter.classServerURI)
conf.set("spark.repl.class.uri", interpreter.classServerURI)
val sparkContext = reallyInitializeSparkContext(
actorLoader, KMBuilder(), conf)
updateInterpreterWithSparkContext(
config, sparkContext, interpreter)
sparkContext
}
// TODO: Think of a better way to test without exposing this
protected[layer] def reallyInitializeSparkContext(
actorLoader: ActorLoader, kmBuilder: KMBuilder, sparkConf: SparkConf
): SparkContext = {
logger.debug("Constructing new Spark Context")
// TODO: Inject stream redirect headers in Spark dynamically
var sparkContext: SparkContext = null
val outStream = new KernelOutputStream(
actorLoader, KMBuilder(), global.ScheduledTaskManager.instance)
// Update global stream state and use it to set the Console local variables
// for threads in the Spark threadpool
global.StreamState.setStreams(System.in, outStream, outStream)
global.StreamState.withStreams {
sparkContext = new SparkContext(sparkConf)
}
sparkContext
}
// TODO: Think of a better way to test without exposing this
protected[layer] def updateInterpreterWithSparkContext(
config: Config, sparkContext: SparkContext, interpreter: Interpreter
) = {
interpreter.doQuietly {
logger.debug("Binding context into interpreter")
interpreter.bind(
"sc", "org.apache.spark.SparkContext",
sparkContext, List( """@transient"""))
// NOTE: This is needed because interpreter blows up after adding
// dependencies to SparkContext and Interpreter before the
// cluster has been used... not exactly sure why this is the case
// TODO: Investigate why the cluster has to be initialized in the kernel
// to avoid the kernel's interpreter blowing up (must be done
// inside the interpreter)
logger.debug("Initializing Spark cluster in interpreter")
interpreter.doQuietly {
interpreter.interpret("""
| val $toBeNulled = {
| var $toBeNulled = sc.emptyRDD.collect()
| $toBeNulled = null
| }
|
|""".stripMargin)
}
}
// Add ourselves as a dependency
// TODO: Provide ability to point to library as commandline argument
// TODO: Provide better method to determine if can add ourselves
// TODO: Avoid duplicating request for master twice (initializeSparkContext
// also does this)
val master = config.getString("spark.master")
// If in local mode, do not need to add our jars as dependencies
if (!master.toLowerCase.startsWith("local")) {
@inline def getJarPathFor(klass: Class[_]): String =
klass.getProtectionDomain.getCodeSource.getLocation.getPath
// TODO: Provide less hard-coded solution in case additional dependencies
// are added or classes are refactored to different projects
val jarPaths = Seq(
// Macro project
getJarPathFor(classOf[com.ibm.spark.annotations.Experimental]),
// Protocol project
getJarPathFor(classOf[com.ibm.spark.kernel.protocol.v5.KernelMessage]),
// Kernel-api project
getJarPathFor(classOf[com.ibm.spark.kernel.api.KernelLike]),
// Kernel project
getJarPathFor(classOf[com.ibm.spark.boot.KernelBootstrap])
)
logger.info("Adding kernel jars to cluster:\n- " +
jarPaths.mkString("\n- "))
jarPaths.foreach(sparkContext.addJar)
} else {
logger.info("Running in local mode! Not adding self as dependency!")
}
}
protected def initializeResponseMap(): collection.mutable.Map[String, ActorRef] =
new ConcurrentHashMap[String, ActorRef]().asScala
private def initializeKernel(
actorLoader: ActorLoader,
interpreter: Interpreter,
commManager: CommManager,
magicLoader: MagicLoader
) = {
val kernel = new Kernel(actorLoader, interpreter, commManager, magicLoader)
interpreter.doQuietly {
interpreter.bind(
"kernel", "com.ibm.spark.kernel.api.Kernel",
kernel, List( """@transient implicit""")
)
}
magicLoader.dependencyMap.setKernel(kernel)
kernel
}
private def initializeMagicLoader(
config: Config, interpreter: Interpreter, sparkContext: SparkContext,
dependencyDownloader: DependencyDownloader
) = {
logger.debug("Constructing magic loader")
logger.debug("Building dependency map")
val dependencyMap = new DependencyMap()
.setInterpreter(interpreter)
.setKernelInterpreter(interpreter) // This is deprecated
.setSparkContext(sparkContext)
.setDependencyDownloader(dependencyDownloader)
logger.debug("Creating BuiltinLoader")
val builtinLoader = new BuiltinLoader()
val magicUrlArray = config.getStringList("magic_urls").asScala
.map(s => new java.net.URL(s)).toArray
if (magicUrlArray.isEmpty)
logger.warn("No external magics provided to MagicLoader!")
else
logger.info("Using magics from the following locations: " +
magicUrlArray.map(_.getPath).mkString(","))
logger.debug("Creating MagicLoader")
val magicLoader = new MagicLoader(
dependencyMap = dependencyMap,
urls = magicUrlArray,
parentLoader = builtinLoader
)
magicLoader.dependencyMap.setMagicLoader(magicLoader)
magicLoader
}
}