blob: eb0e29756c5db681741aac9e18d86a51f0406265 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark
import java.io.File
import java.net.URLClassLoader
import java.nio.file.{Files, Paths}
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.repl.SparkILoop
import org.apache.spark.repl.SparkILoop._
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup}
import org.slf4j.{Logger, LoggerFactory}
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter._
/**
* SparkInterpreter for scala-2.10
*/
class SparkScala210Interpreter(override val conf: SparkConf,
override val depFiles: java.util.List[String],
override val properties: Properties,
override val interpreterGroup: InterpreterGroup,
override val sparkInterpreterClassLoader: URLClassLoader)
extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
private var sparkILoop: SparkILoop = _
override val interpreterOutput =
new InterpreterOutputStream(LoggerFactory.getLogger(classOf[SparkScala210Interpreter]))
override def open(): Unit = {
super.open()
// redirect the output of open to InterpreterOutputStream, so that user can have more
// diagnose info in frontend
if (InterpreterContext.get() != null) {
interpreterOutput.setInterpreterOutput(InterpreterContext.get().out)
}
val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
outputDir.deleteOnExit()
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
// Only Spark1 requires to create http server, Spark2 removes HttpServer class.
startHttpServer(outputDir).foreach { case (server, uri) =>
sparkHttpServer = server
conf.set("spark.repl.class.uri", uri)
}
val settings = new Settings()
settings.embeddedDefaults(sparkInterpreterClassLoader)
settings.usejavacp.value = true
settings.classpath.value = getUserJars.mkString(File.pathSeparator)
if (properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean) {
Console.setOut(interpreterOutput)
}
sparkILoop = new SparkILoop()
setDeclaredField(sparkILoop, "settings", settings)
callMethod(sparkILoop, "createInterpreter")
sparkILoop.initializeSynchronous()
callMethod(sparkILoop, "postInitialization")
val reader = callMethod(sparkILoop,
"org$apache$spark$repl$SparkILoop$$chooseReader",
Array(settings.getClass), Array(settings)).asInstanceOf[InteractiveReader]
setDeclaredField(sparkILoop, "org$apache$spark$repl$SparkILoop$$in", reader)
this.scalaCompletion = reader.completion
createSparkContext()
createZeppelinContext()
}
protected def completion(buf: String,
cursor: Int,
context: InterpreterContext): java.util.List[InterpreterCompletion] = {
val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
.map(e => new InterpreterCompletion(e, e, null))
scala.collection.JavaConversions.seqAsJavaList(completions)
}
def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
sparkILoop.interpret(code)
protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
sparkILoop.beQuietDuring {
sparkILoop.bind(name, tpe, value, modifier)
}
}
}