blob: ca7199a99059314a10760281fcf479021f7987d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.scalaapi
import java.io.File
import java.net.URI
import java.util.concurrent.{Executors, Future => JFuture, ScheduledFuture, ThreadFactory, TimeUnit}
import scala.concurrent._
import scala.util.Try
import org.apache.livy._
/**
* A client for submitting Spark-based jobs to a Livy backend.
* @constructor Creates a Scala client.
* @param livyJavaClient the Java client of Livy.
*/
class LivyScalaClient(livyJavaClient: LivyClient) {
private val executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
override def newThread(r: Runnable): Thread = {
val thread = new Thread(r, "LivyScalaClient-PollingContainer")
thread.setDaemon(true)
thread
}
})
/**
* Submits a job for asynchronous execution.
*
* @param fn The job to be executed. It is a function that takes in a ScalaJobContext and
* returns the result of the execution of the job with that context.
* @return A handle that can be used to monitor the job.
*/
def submit[T](fn: ScalaJobContext => T): ScalaJobHandle[T] = {
val job = new Job[T] {
@throws(classOf[Exception])
override def call(jobContext: JobContext): T = fn(new ScalaJobContext(jobContext))
}
new ScalaJobHandle(livyJavaClient.submit(job))
}
/**
* Asks the remote context to run a job immediately.
*
* Normally, the remote context will queue jobs and execute them based on how many worker
* threads have been configured. This method will run the submitted job in the same thread
* processing the RPC message, so that queueing does not apply.
*
* It's recommended that this method only be used to run code that finishes quickly. This
* avoids interfering with the normal operation of the context.
*
* @param fn The job to be executed. It is a function that takes in a ScalaJobContext and
* returns the result of the execution of the job with that context.
* @return A handle that can be used to monitor the job.
*/
def run[T](fn: ScalaJobContext => T): Future[T] = {
val job = new Job[T] {
@throws(classOf[Exception])
override def call(jobContext: JobContext): T = {
val scalaJobContext = new ScalaJobContext(jobContext)
fn(scalaJobContext)
}
}
new PollingContainer(livyJavaClient.run(job)).poll()
}
/**
* Stops the remote context.
*
* Any pending jobs will be cancelled, and the remote context will be torn down.
*
* @param shutdownContext Whether to shutdown the underlying Spark context. If false, the
* context will keep running and it's still possible to send commands
* to it, if the backend being used supports it.
*/
def stop(shutdownContext: Boolean): Unit = {
executor.shutdown()
livyJavaClient.stop(shutdownContext)
}
/**
* Upload a jar to be added to the Spark application classpath.
*
* @param jar The local file to be uploaded.
* @return A future that can be used to monitor this operation.
*/
def uploadJar(jar: File): Future[_] = new PollingContainer(livyJavaClient.uploadJar(jar)).poll()
/**
* Adds a jar file to the running remote context.
*
* Note that the URL should be reachable by the Spark driver process. If running the driver
* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
* on that node (and not on the client machine).
*
* If the provided URI has no scheme, it's considered to be relative to the default file system
* configured in the Livy server.
*
* @param uri The location of the jar file.
* @return A future that can be used to monitor the operation.
*/
def addJar(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addJar(uri)).poll()
/**
* Upload a file to be passed to the Spark application.
*
* @param file The local file to be uploaded.
* @return A future that can be used to monitor this operation.
*/
def uploadFile(file: File): Future[_] =
new PollingContainer(livyJavaClient.uploadFile(file)).poll()
/**
* Adds a file to the running remote context.
*
* Note that the URL should be reachable by the Spark driver process. If running the driver
* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
* on that node (and not on the client machine).
*
* If the provided URI has no scheme, it's considered to be relative to the default file system
* configured in the Livy server.
*
* @param uri The location of the file.
* @return A future that can be used to monitor the operation.
*/
def addFile(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uri)).poll()
private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) extends Runnable {
private val initialDelay = 1
private val longDelay = 1
private var scheduledFuture: ScheduledFuture[_] = _
private val promise = Promise[T]
def poll(): Future[T] = {
scheduledFuture =
executor.scheduleWithFixedDelay(this, initialDelay, longDelay, TimeUnit.SECONDS)
promise.future
}
override def run(): Unit = {
if (jFuture.isDone) {
promise.complete(Try(getJavaFutureResult(jFuture)))
scheduledFuture.cancel(false)
}
}
}
}