TOREE-390: Lazily initialize spark sessions
This updates the kernel so that it lazily initializes
Spark sessions. This allows the user to configure and
start a session, or to wait until spark is first
referenced to start a session. This shortens the time
to the first interaction with Toree.
It also adds a notification that Toree
is waiting for a Spark session to start when the
getOrCreate method takes more than 100ms.
Closes #109.
diff --git a/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala
index fcb5271..4a19d4b 100644
--- a/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala
+++ b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala
@@ -17,22 +17,18 @@
package org.apache.toree.kernel.api
-import java.io.{InputStream, OutputStream, PrintStream}
-
+import java.io.{InputStream, PrintStream}
+import java.net.URI
import com.typesafe.config.Config
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.sql.SparkSession
/**
* Interface for the kernel API. This does not include exposed variables.
*/
trait KernelLike {
- def createSparkContext(conf: SparkConf): SparkContext
-
- def createSparkContext(master: String): SparkContext
-
/**
* Executes a block of code represented as a string and returns the result.
*
@@ -105,6 +101,8 @@
def config: Config
+ def addJars(uris: URI*)
+
def sparkContext: SparkContext
def sparkConf: SparkConf
diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala
deleted file mode 100644
index 92dfe05..0000000
--- a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.magic.dependencies
-
-import org.apache.spark.SparkContext
-import org.apache.toree.magic.Magic
-import org.apache.toree.plugins.Plugin
-import org.apache.toree.plugins.annotations.{Event, Init}
-
-trait IncludeSparkContext extends Plugin {
- this: Magic =>
-
- @Event(name = "sparkReady") protected def sparkReady(
- newSparkContext: SparkContext
- ) = _sparkContext = newSparkContext
-
- private var _sparkContext: SparkContext = _
- def sparkContext: SparkContext = _sparkContext
-}
diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkSession.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkSession.scala
deleted file mode 100644
index 9bd92c2..0000000
--- a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkSession.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.magic.dependencies
-
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.toree.magic.Magic
-import org.apache.toree.plugins.Plugin
-import org.apache.toree.plugins.annotations.{Event, Init}
-
-trait IncludeSparkSession extends Plugin {
- this: Magic =>
-
- @Event(name = "sparkReady") protected def sparkReady(
- newSparkSession: SparkSession
- ) = _spark = newSparkSession
-
- private var _spark: SparkSession = _
- def spark: SparkSession = _spark
-}
diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
index 63a358d..42999d7 100644
--- a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
+++ b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
@@ -45,7 +45,6 @@
* Initializes and registers all components (not needed by bare init).
*
* @param config The config used for initialization
- * @param appName The name of the "application" for Spark
* @param actorLoader The actor loader to use for some initialization
*/
def initializeComponents(
@@ -83,8 +82,6 @@
initializePlugins(config, pluginManager)
- initializeSparkContext(config, kernel)
-
interpreterManager.initializeInterpreters(kernel)
pluginManager.fireEvent(AllInterpretersReady)
@@ -97,13 +94,6 @@
}
-
- def initializeSparkContext(config:Config, kernel:Kernel) = {
- if(!config.getBoolean("nosparkcontext")) {
- kernel.createSparkContext(config.getString("spark.master"))
- }
- }
-
private def initializeCommObjects(actorLoader: ActorLoader) = {
logger.debug("Constructing Comm storage")
val commStorage = new CommStorage()
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
index 417983b..285db1f 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
@@ -18,7 +18,8 @@
package org.apache.toree.kernel.api
import java.io.{InputStream, PrintStream}
-import java.util.concurrent.ConcurrentHashMap
+import java.net.URI
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit, TimeoutException}
import scala.collection.mutable
import com.typesafe.config.Config
import org.apache.spark.api.java.JavaSparkContext
@@ -35,14 +36,15 @@
import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader
import org.apache.toree.kernel.protocol.v5.magic.MagicParser
import org.apache.toree.kernel.protocol.v5.stream.KernelOutputStream
-import org.apache.toree.kernel.protocol.v5.{KMBuilder, KernelMessage}
+import org.apache.toree.kernel.protocol.v5.{KMBuilder, KernelMessage, MIMEType}
import org.apache.toree.magic.MagicManager
import org.apache.toree.plugins.PluginManager
-import org.apache.toree.utils.{KeyValuePairUtils, LogLike}
+import org.apache.toree.utils.LogLike
import scala.language.dynamics
import scala.reflect.runtime.universe._
-import scala.util.{DynamicVariable, Try}
-import org.apache.toree.plugins.SparkReady
+import scala.util.DynamicVariable
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Future, Await}
/**
* Represents the main kernel API to be used for interaction.
@@ -62,6 +64,23 @@
) extends KernelLike with LogLike {
/**
+ * Jars that have been added to the kernel
+ */
+ private val jars = new mutable.ArrayBuffer[URI]()
+
+ override def addJars(uris: URI*): Unit = {
+ uris.foreach { uri =>
+ if (uri.getScheme != "file") {
+ throw new RuntimeException("Cannot add non-local jar: " + uri)
+ }
+ }
+
+ jars ++= uris
+ interpreter.addJars(uris.map(_.toURL):_*)
+ uris.foreach(uri => sparkContext.addJar(uri.getPath))
+ }
+
+ /**
* Represents the current input stream used by the kernel for the specific
* thread.
*/
@@ -339,30 +358,6 @@
someKernelMessage.get
}
- override def createSparkContext(conf: SparkConf): SparkContext = {
- val sconf = createSparkConf(conf)
- val _sparkSession = SparkSession.builder.config(sconf).getOrCreate()
-
- val sparkMaster = sconf.getOption("spark.master").getOrElse("not_set")
- logger.info( s"Connecting to spark.master $sparkMaster")
-
- // TODO: Convert to events
- pluginManager.dependencyManager.add(_sparkSession.sparkContext.getConf)
- pluginManager.dependencyManager.add(_sparkSession)
- pluginManager.dependencyManager.add(_sparkSession.sparkContext)
- pluginManager.dependencyManager.add(javaSparkContext(_sparkSession))
-
- pluginManager.fireEvent(SparkReady)
-
- _sparkSession.sparkContext
- }
-
- override def createSparkContext(
- master: String
- ): SparkContext = {
- createSparkContext(new SparkConf().setMaster(master))
- }
-
// TODO: Think of a better way to test without exposing this
protected[toree] def createSparkConf(conf: SparkConf) = {
@@ -401,7 +396,36 @@
interpreterManager.interpreters.get(name)
}
- override def sparkSession: SparkSession = SparkSession.builder.getOrCreate
+ private lazy val defaultSparkConf: SparkConf = createSparkConf(new SparkConf())
+
+ override def sparkSession: SparkSession = {
+ defaultSparkConf.getOption("spark.master") match {
+ case Some(master) if !master.contains("local") =>
+ // when connecting to a remote cluster, the first call to getOrCreate
+ // may create a session and take a long time, so this starts a future
+ // to get the session. if it take longer than 100 ms, then print a
+ // message to the user that Spark is starting.
+ import scala.concurrent.ExecutionContext.Implicits.global
+ val sessionFuture = Future {
+ SparkSession.builder.config(defaultSparkConf).getOrCreate
+ }
+
+ try {
+ Await.result(sessionFuture, Duration(100, TimeUnit.MILLISECONDS))
+ } catch {
+ case timeout: TimeoutException =>
+ // getting the session is taking a long time, so assume that Spark
+ // is starting and print a message
+ display.content(
+ MIMEType.PlainText, "Waiting for a Spark session to start...")
+ Await.result(sessionFuture, Duration.Inf)
+ }
+
+ case _ =>
+ SparkSession.builder.config(defaultSparkConf).getOrCreate
+ }
+ }
+
override def sparkContext: SparkContext = sparkSession.sparkContext
override def sparkConf: SparkConf = sparkSession.sparkContext.getConf
override def javaSparkContext: JavaSparkContext = javaSparkContext(sparkSession)
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
index 2f4d812..9ef9359 100644
--- a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
@@ -30,7 +30,7 @@
class AddDeps extends LineMagic with IncludeInterpreter
- with IncludeOutputStream with IncludeSparkContext with ArgumentParsingSupport
+ with IncludeOutputStream with ArgumentParsingSupport
with IncludeDependencyDownloader with IncludeKernel
{
@@ -78,7 +78,7 @@
if (nonOptionArgs.size == 3) {
// get the jars and hold onto the paths at which they reside
- val urls = dependencyDownloader.retrieve(
+ val uris = dependencyDownloader.retrieve(
groupId = nonOptionArgs.head,
artifactId = nonOptionArgs(1),
version = nonOptionArgs(2),
@@ -87,11 +87,10 @@
extraRepositories = repositoriesWithCreds,
verbose = _verbose,
trace = _trace
- ).map(_.toURL)
+ )
- // add the jars to the interpreter and spark context
- interpreter.addJars(urls:_*)
- urls.foreach(url => sparkContext.addJar(url.getPath))
+ // pass the new Jars to the kernel
+ kernel.addJars(uris:_*)
} else {
printHelp(printStream, """%AddDeps my.company artifact-id version""")
}
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
index 4893077..48a8124 100644
--- a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
@@ -47,7 +47,7 @@
}
class AddJar
- extends LineMagic with IncludeInterpreter with IncludeSparkContext
+ extends LineMagic with IncludeInterpreter
with IncludeOutputStream with DownloadSupport with ArgumentParsingSupport
with IncludeKernel with IncludePluginManager with IncludeConfig with LogLike
{
@@ -137,8 +137,7 @@
val plugins = pluginManager.loadPlugins(fileDownloadLocation)
pluginManager.initializePlugins(plugins)
} else {
- interpreter.addJars(fileDownloadLocation.toURI.toURL)
- sparkContext.addJar(fileDownloadLocation.getCanonicalPath)
+ kernel.addJars(fileDownloadLocation.toURI)
}
}
}
diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala
index 659af42..421d12b 100644
--- a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddDepsSpec.scala
@@ -19,11 +19,9 @@
import java.io.{ByteArrayOutputStream, OutputStream}
import java.net.{URI, URL}
-
import org.apache.toree.dependencies.{Credentials, DependencyDownloader}
-import org.apache.toree.interpreter.Interpreter
import org.apache.toree.utils.ArgumentParsingSupport
-import org.apache.spark.SparkContext
+import org.apache.toree.kernel.api.KernelLike
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FunSpec, GivenWhenThen, Matchers}
import org.mockito.Mockito._
@@ -38,20 +36,17 @@
describe("#execute") {
it("should print out the help message if the input is invalid") {
val byteArrayOutputStream = new ByteArrayOutputStream()
- val mockIntp = mock[Interpreter]
- val mockSC = mock[SparkContext]
+ val mockKernel = mock[KernelLike]
val mockDownloader = mock[DependencyDownloader]
var printHelpWasRun = false
val addDepsMagic = new AddDeps
- with IncludeSparkContext
- with IncludeInterpreter
+ with IncludeKernel
with IncludeOutputStream
with IncludeDependencyDownloader
with ArgumentParsingSupport
{
- override val sparkContext: SparkContext = mockSC
- override val interpreter: Interpreter = mockIntp
+ override val kernel: KernelLike = mockKernel
override val dependencyDownloader: DependencyDownloader =
mockDownloader
override val outputStream: OutputStream = byteArrayOutputStream
@@ -65,9 +60,7 @@
val actual = addDepsMagic.execute("notvalid")
printHelpWasRun should be (true)
- verify(mockIntp, times(0)).addJars(any())
- verify(mockIntp, times(0)).bind(any(), any(), any(), any())
- verify(mockSC, times(0)).addJar(any())
+ verify(mockKernel, times(0)).addJars(any())
verify(mockDownloader, times(0)).retrieve(
anyString(), anyString(), anyString(), anyBoolean(), anyBoolean(),
anyBoolean(), any[Seq[(URL, Option[Credentials])]], anyBoolean(), anyBoolean()
@@ -83,14 +76,12 @@
)
val addDepsMagic = new AddDeps
- with IncludeSparkContext
- with IncludeInterpreter
+ with IncludeKernel
with IncludeOutputStream
with IncludeDependencyDownloader
with ArgumentParsingSupport
{
- override val sparkContext: SparkContext = mock[SparkContext]
- override val interpreter: Interpreter = mock[Interpreter]
+ override val kernel: KernelLike = mock[KernelLike]
override val dependencyDownloader: DependencyDownloader =
mockDependencyDownloader
override val outputStream: OutputStream = mock[OutputStream]
@@ -111,14 +102,12 @@
)
val addDepsMagic = new AddDeps
- with IncludeSparkContext
- with IncludeInterpreter
+ with IncludeKernel
with IncludeOutputStream
with IncludeDependencyDownloader
with ArgumentParsingSupport
{
- override val sparkContext: SparkContext = mock[SparkContext]
- override val interpreter: Interpreter = mock[Interpreter]
+ override val kernel: KernelLike = mock[KernelLike]
override val dependencyDownloader: DependencyDownloader =
mockDependencyDownloader
override val outputStream: OutputStream = mock[OutputStream]
@@ -131,23 +120,21 @@
expected(0), expected(1), expected(2), false)
}
- it("should add retrieved artifacts to the interpreter") {
+ it("should add retrieved artifacts to the kernel") {
val mockDependencyDownloader = mock[DependencyDownloader]
doReturn(Nil).when(mockDependencyDownloader).retrieve(
anyString(), anyString(), anyString(), anyBoolean(), anyBoolean(),
anyBoolean(), any[Seq[(URL, Option[Credentials])]], anyBoolean(), anyBoolean()
)
- val mockInterpreter = mock[Interpreter]
+ val mockKernel = mock[KernelLike]
val addDepsMagic = new AddDeps
- with IncludeSparkContext
- with IncludeInterpreter
+ with IncludeKernel
with IncludeOutputStream
with IncludeDependencyDownloader
with ArgumentParsingSupport
{
- override val sparkContext: SparkContext = mock[SparkContext]
- override val interpreter: Interpreter = mockInterpreter
+ override val kernel: KernelLike = mockKernel
override val dependencyDownloader: DependencyDownloader =
mockDependencyDownloader
override val outputStream: OutputStream = mock[OutputStream]
@@ -156,37 +143,7 @@
val expected = "org.apache.toree" :: "kernel" :: "1.0" :: Nil
addDepsMagic.execute(expected.mkString(" "))
- verify(mockInterpreter).addJars(any[URL])
- }
-
- it("should add retrieved artifacts to the spark context") {
- val mockDependencyDownloader = mock[DependencyDownloader]
- val fakeUri = new URI("file:/foo")
- doReturn(fakeUri :: fakeUri :: fakeUri :: Nil)
- .when(mockDependencyDownloader).retrieve(
- anyString(), anyString(), anyString(), anyBoolean(), anyBoolean(),
- anyBoolean(), any[Seq[(URL, Option[Credentials])]], anyBoolean(), anyBoolean()
- )
- val mockSparkContext = mock[SparkContext]
-
- val addDepsMagic = new AddDeps
- with IncludeSparkContext
- with IncludeInterpreter
- with IncludeOutputStream
- with IncludeDependencyDownloader
- with ArgumentParsingSupport
- {
- override val sparkContext: SparkContext = mockSparkContext
- override val interpreter: Interpreter = mock[Interpreter]
- override val dependencyDownloader: DependencyDownloader =
- mockDependencyDownloader
- override val outputStream: OutputStream = mock[OutputStream]
- }
-
- val expected = "org.apache.toree" :: "kernel" :: "1.0" :: Nil
- addDepsMagic.execute(expected.mkString(" "))
-
- verify(mockSparkContext, times(3)).addJar(anyString())
+ verify(mockKernel).addJars(any[URI])
}
}
}
diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala
index 169fb60..1c7b3fc 100644
--- a/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/AddJarSpec.scala
@@ -18,15 +18,15 @@
package org.apache.toree.magic.builtin
import java.io.OutputStream
-import java.net.URL
-import java.nio.file.{FileSystems, Files}
-
+import java.net.{URI, URL}
+import java.nio.file.{Files, FileSystems}
import org.apache.toree.interpreter.Interpreter
-import org.apache.toree.magic.dependencies.{IncludeConfig, IncludeOutputStream, IncludeInterpreter, IncludeSparkContext}
+import org.apache.toree.magic.dependencies.{IncludeConfig, IncludeInterpreter, IncludeKernel, IncludeOutputStream}
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkContext
+import org.apache.toree.kernel.api.KernelLike
import org.apache.toree.plugins.PluginManager
-import org.scalatest.{Matchers, FunSpec}
+import org.scalatest.{FunSpec, Matchers}
import org.scalatest.mock.MockitoSugar
import org.mockito.Mockito._
import org.mockito.Matchers._
@@ -34,22 +34,18 @@
class AddJarSpec extends FunSpec with Matchers with MockitoSugar {
describe("AddJar"){
describe("#execute") {
- it("should call addJar on the provided SparkContext and addJars on the " +
- "provided interpreter") {
- val mockSparkContext = mock[SparkContext]
- val mockInterpreter = mock[Interpreter]
+ it("should call addJar on the provided kernel") {
+ val mockKernel = mock[KernelLike]
val mockOutputStream = mock[OutputStream]
val mockPluginManager = mock[PluginManager]
val testConfig = ConfigFactory.load()
val addJarMagic = new AddJar
- with IncludeSparkContext
- with IncludeInterpreter
with IncludeOutputStream
with IncludeConfig
+ with IncludeKernel
{
- override val sparkContext: SparkContext = mockSparkContext
- override val interpreter: Interpreter = mockInterpreter
+ override val kernel: KernelLike = mockKernel
override val outputStream: OutputStream = mockOutputStream
override lazy val pluginManager: PluginManager = mockPluginManager
override val config = testConfig
@@ -59,8 +55,7 @@
addJarMagic.execute("""http://www.example.com/someJar.jar""")
- verify(mockSparkContext).addJar(anyString())
- verify(mockInterpreter).addJars(any[URL])
+ verify(mockKernel).addJars(any[URI])
verify(mockPluginManager, times(0)).loadPlugins(any())
}
@@ -104,21 +99,18 @@
}
it("should use a cached jar if the force option is not provided") {
- val mockSparkContext = mock[SparkContext]
- val mockInterpreter = mock[Interpreter]
+ val mockKernel = mock[KernelLike]
val mockOutputStream = mock[OutputStream]
var downloadFileCalled = false // Used to verify that downloadFile
// was or was not called in this test
val testConfig = ConfigFactory.load()
val addJarMagic = new AddJar
- with IncludeSparkContext
- with IncludeInterpreter
with IncludeOutputStream
with IncludeConfig
+ with IncludeKernel
{
- override val sparkContext: SparkContext = mockSparkContext
- override val interpreter: Interpreter = mockInterpreter
+ override val kernel: KernelLike = mockKernel
override val outputStream: OutputStream = mockOutputStream
override val config = testConfig
override def downloadFile(fileUrl: URL, destinationUrl: URL): URL = {
@@ -140,26 +132,22 @@
tmpFilePath.toFile.delete()
downloadFileCalled should be (false)
- verify(mockSparkContext).addJar(anyString())
- verify(mockInterpreter).addJars(any[URL])
+ verify(mockKernel).addJars(any[URI])
}
it("should not use a cached jar if the force option is provided") {
- val mockSparkContext = mock[SparkContext]
- val mockInterpreter = mock[Interpreter]
+ val mockKernel = mock[KernelLike]
val mockOutputStream = mock[OutputStream]
var downloadFileCalled = false // Used to verify that downloadFile
// was or was not called in this test
val testConfig = ConfigFactory.load()
val addJarMagic = new AddJar
- with IncludeSparkContext
- with IncludeInterpreter
with IncludeOutputStream
with IncludeConfig
+ with IncludeKernel
{
- override val sparkContext: SparkContext = mockSparkContext
- override val interpreter: Interpreter = mockInterpreter
+ override val kernel: KernelLike = mockKernel
override val outputStream: OutputStream = mockOutputStream
override val config = testConfig
override def downloadFile(fileUrl: URL, destinationUrl: URL): URL = {
@@ -181,8 +169,7 @@
tmpFilePath.toFile.delete()
downloadFileCalled should be (true)
- verify(mockSparkContext).addJar(anyString())
- verify(mockInterpreter).addJars(any[URL])
+ verify(mockKernel).addJars(any[URI])
}
it("should add magic jar to magicloader and not to interpreter and spark"+
@@ -194,12 +181,10 @@
val testConfig = ConfigFactory.load()
val addJarMagic = new AddJar
- with IncludeSparkContext
with IncludeInterpreter
with IncludeOutputStream
with IncludeConfig
{
- override val sparkContext: SparkContext = mockSparkContext
override val interpreter: Interpreter = mockInterpreter
override val outputStream: OutputStream = mockOutputStream
override lazy val pluginManager: PluginManager = mockPluginManager
diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala
index f68db5e..edd8979 100644
--- a/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/LSMagicSpec.scala
@@ -21,7 +21,7 @@
import java.net.URL
import org.apache.toree.interpreter.Interpreter
-import org.apache.toree.magic.dependencies.{IncludeOutputStream, IncludeInterpreter, IncludeSparkContext}
+import org.apache.toree.magic.dependencies.{IncludeOutputStream, IncludeInterpreter}
import org.apache.toree.magic.{CellMagic, LineMagic}
import org.apache.spark.SparkContext
import org.scalatest.{Matchers, FunSpec}
@@ -32,11 +32,9 @@
class TestLSMagic(sc: SparkContext, intp: Interpreter, os: OutputStream)
extends LSMagic
- with IncludeSparkContext
with IncludeInterpreter
with IncludeOutputStream
{
- override val sparkContext: SparkContext = sc
override val interpreter: Interpreter = intp
override val outputStream: OutputStream = os
}