blob: 6f40dbe3e6604a8b029b4fcf02ca7624ebbdf6de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.kernel.protocol.v5.client
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import org.apache.toree.comm._
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.client.execution.{DeferredExecution, ExecuteRequestTuple}
import org.apache.toree.kernel.protocol.v5.client.socket.HeartbeatMessage
import org.apache.toree.kernel.protocol.v5.client.socket.StdinClient.{ResponseFunctionMessage, ResponseFunction}
import org.apache.toree.kernel.protocol.v5.content.ExecuteRequest
import org.apache.toree.utils.LogLike
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
/**
* Client API for Spark Kernel.
*
* Note: This takes a moment to initialize an actor system, so take appropriate action
* if an API is to be used immediately after initialization.
*
* The actorSystem parameter allows shutdown of this client's ActorSystem.
*/
class SparkKernelClient(
private val actorLoader: ActorLoader,
private val actorSystem: ActorSystem,
private val commRegistrar: CommRegistrar
) extends LogLike {
implicit val timeout = Timeout(21474835.seconds)
/**
* Executes code on the Spark Kernel.
* Gives a <code>DeferredExecution</code> used to handle results from
* code execution. Specifically it can be used to register
* callbacks that handle stream results, the code execution result, or code
* execution errors.
*
* Code that prints to stdout is considered streaming and will be sent to all
* callbacks registered with the onStream() method. For example:
* <code>
* client.execute("println(1)").onStream(someFunction)
* </code>
* someFunction will receive a
* `org.apache.toree.kernel.protocol.v5.content.StreamContent` message:
* <code>
* {
* "name" : "stdout",
* "text" : "1"
* }
* </code>
*
* Code that produces a result will cause invocation of the callbacks
* registered with the onResult() method. The callbacks will be invoked with
* the result of the executed code. For example:
* <code>
* client.execute("1+1").onResult(someFunction)
* </code>
* someFunction will receive a
* [[org.apache.toree.kernel.protocol.v5.content.ExecuteResult]] message:
* <code>
* {
* "execution_count" : 1,
* "data" : {
* "text/plain" : "2"
* },
* "metadata" : {}
* }
* </code>
*
* Code that produces an error will be sent to all callbacks registered
* with the onResult() method. For example:
* <code>
* client.execute("1+1").onResult(someFunction)
* </code>
* someFunction will be invoked with an
* [[org.apache.toree.kernel.protocol.v5.content.ExecuteReply]] message
* containing the error.
*
* @param code Scala code
* @return The DeferredExecution associated with the code execution.
*/
def execute(code: String): DeferredExecution = {
val request = ExecuteRequest(code, false, true, UserExpressions(), true)
val de = new DeferredExecution
actorLoader.load(MessageType.Incoming.ExecuteRequest) ! ExecuteRequestTuple(request, de)
de
}
/**
* Sets the response function used when input is requested by the kernel.
* @param responseFunc The response function to use
*/
def setResponseFunction(responseFunc: ResponseFunction): Unit = {
actorLoader.load(SocketType.StdInClient) !
ResponseFunctionMessage(responseFunc)
}
/**
* Represents the exposed interface for Comm communication with the kernel.
*/
val comm = new ClientCommManager(
actorLoader = actorLoader,
kmBuilder = KMBuilder(),
commRegistrar = commRegistrar
)
// TODO: hide this? just heartbeat to see if kernel is reachable?
def heartbeat(failure: () => Unit): Unit = {
val future = actorLoader.load(SocketType.Heartbeat) ? HeartbeatMessage
future.onComplete {
case Success(_) =>
logger.info("Client received heartbeat.")
case Failure(_) =>
failure()
logger.info("There was an error receiving heartbeat from kernel.")
}
}
def shutdown() = {
logger.info("Shutting down client")
actorSystem.terminate()
}
}