blob: 3ff3a52d0d473cdf24ef7e8d10de8e8320396c7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.kernel.interpreter.pyspark
import java.net.URL
import org.apache.toree.interpreter.Results.Result
import org.apache.toree.interpreter._
import org.apache.toree.kernel.api.KernelLike
import org.slf4j.LoggerFactory
import py4j.GatewayServer
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.tools.nsc.interpreter.{InputStream, OutputStream}
/**
* Represents an interpreter interface to PySpark. Requires a properly-set
* SPARK_HOME, PYTHONPATH pointing to Spark's Python source, and py4j installed
* where it is accessible to the Spark Kernel. Optionally specify PYTHON_EXEC
* to override the default python executable "python'
*
*/
class PySparkInterpreter(
) extends Interpreter {
/** Maximum time to wait for the python kernel to be readu */
private val WAIT_DURATION: Long = java.util.concurrent.TimeUnit.SECONDS.toMillis(50)
private val PythonExecEnv = "PYTHON_EXEC"
private lazy val pythonExecutable = Option(System.getenv(PythonExecEnv)).getOrElse("python")
private val logger = LoggerFactory.getLogger(this.getClass)
private var _kernel:KernelLike = _
// TODO: Replace hard-coded maximum queue count
/** Represents the state used by this interpreter's Python instance. */
private lazy val pySparkState = new PySparkState(500)
/** Represents the bridge used by this interpreter's Python interface. */
private lazy val pySparkBridge = PySparkBridge(
pySparkState,
_kernel
)
/** Represents the interface for Python to talk to JVM Spark components. */
private lazy val gatewayServer = new GatewayServer(pySparkBridge, 0)
/** Represents the process handler used for the PySpark process. */
private lazy val pySparkProcessHandler: PySparkProcessHandler =
new PySparkProcessHandler(
pySparkBridge,
restartOnFailure = true,
restartOnCompletion = true
)
private lazy val pySparkService = new PySparkService(
pythonExecutable,
gatewayServer,
pySparkBridge,
pySparkProcessHandler
)
private lazy val pySparkTransformer = new PySparkTransformer
/**
* Initializes the interpreter.
*
* @param kernel The kernel
* @return The newly initialized interpreter
*/
override def init(kernel: KernelLike): Interpreter = {
_kernel = kernel
this
}
/**
* Executes the provided code with the option to silence output.
*
* @param code The code to execute
* @param silent Whether or not to execute the code silently (no output)
* @return The success/failure of the interpretation and the output from the
* execution or the failure
*/
override def interpret(code: String, silent: Boolean, output: Option[OutputStream]):
(Result, Either[ExecuteOutput, ExecuteFailure]) = {
if (!pySparkService.isRunning) pySparkService.start()
val futureResult = pySparkTransformer.transformToInterpreterResult(
pySparkService.submitCode(code, output)
)
Await.result(futureResult, Duration.Inf)
}
/**
* Starts the interpreter, initializing any internal state.
*
* @return A reference to the interpreter
*/
override def start(): Interpreter = {
pySparkService.start()
this
}
/**
* Stops the interpreter, removing any previous internal state.
*
* @return A reference to the interpreter
*/
override def stop(): Interpreter = {
pySparkService.stop()
this
}
/**
* Returns the class loader used by this interpreter.
*
* @return The runtime class loader used by this interpreter
*/
override def classLoader: ClassLoader = this.getClass.getClassLoader
// Unsupported (but can be invoked)
override def lastExecutionVariableName: Option[String] = None
// Unsupported (but can be invoked)
override def read(variableName: String): Option[AnyRef] = None
// Unsupported
override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
// Unsupported
override def interrupt(): Interpreter = ???
// Unsupported
override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
// Unsupported
override def addJars(jars: URL*): Unit = ???
// Unsupported
override def doQuietly[T](body: => T): T = ???
override def languageInfo: LanguageInfo = {
import scala.sys.process._
// Issue a subprocess call to grab the python version. This is better than polling a child process.
val version = Seq(
pythonExecutable,
"-c",
"import sys; print('{s.major}.{s.minor}.{s.micro}'.format(s=sys.version_info))").!!
LanguageInfo(
"python",
version = version,
fileExtension = Some(".py"),
pygmentsLexer = Some("python"),
mimeType = Some("text/x-ipython"),
codemirrorMode = Some("text/x-ipython"))
}
}