blob: d1cf29de0904659dc1dc411ac1e091a224b79a57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.scalaapi
import scala.concurrent.{CanAwait, ExecutionContext, Future, TimeoutException}
import scala.concurrent.duration.Duration
import scala.util.Try
import org.apache.livy.JobHandle
import org.apache.livy.JobHandle.{Listener, State}
/**
* A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
*
* @constructor Creates a ScalaJobHandle.
* @param jobHandle the Java JobHandle of Livy.
*
* @define multipleCallbacks
* Multiple callbacks may be registered; there is no guarantee that they will be
* executed in a particular order.
*
* @define nonDeterministic
* Note: using this method yields nondeterministic dataflow programs.
*
* @define callbackInContext
* The provided callback always runs in the provided implicit
*` ExecutionContext`, though there is no guarantee that the
* `execute()` method on the `ExecutionContext` will be called once
* per callback or that `execute()` will be called in the current
* thread. That is, the implementation may run multiple callbacks
* in a batch within a single `execute()` and it may run
* `execute()` either immediately or asynchronously.
*/
class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends Future[T] {
/**
* Return the current state of the job.
*/
def state: State = jobHandle.getState()
/**
* When the job is completed, either through an exception, or a value,
* apply the provided function.
*
* If the job has already been completed,
* this will either be applied immediately or be scheduled asynchronously.
*
* $multipleCallbacks
* $callbackInContext
*/
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = {
jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {
val onJobSucceededTask = new Runnable {
override def run(): Unit = func(Try(result))
}
executor.execute(onJobSucceededTask)
}
override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {
val onJobFailedTask = new Runnable {
override def run(): Unit = func(Try(getJavaFutureResult(job)))
}
executor.execute(onJobFailedTask)
}
})
}
/**
* When this job is queued, apply the provided function.
*
* $multipleCallbacks
* $callbackInContext
*/
def onJobQueued[U](func: => Unit)(implicit executor: ExecutionContext): Unit = {
jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
override def onJobQueued(job: JobHandle[T]): Unit = {
val onJobQueuedTask = new Runnable {
override def run(): Unit = func
}
executor.execute(onJobQueuedTask)
}
})
}
/**
* When this job has started, apply the provided function.
*
* $multipleCallbacks
* $callbackInContext
*/
def onJobStarted[U](func: => Unit)(implicit executor: ExecutionContext): Unit = {
jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
override def onJobStarted(job: JobHandle[T]): Unit = {
val onJobStartedTask = new Runnable {
override def run(): Unit = func
}
executor.execute(onJobStartedTask)
}
})
}
/**
* When this job is cancelled, apply the provided function.
*
* $multipleCallbacks
* $callbackInContext
*/
def onJobCancelled[U](func: Boolean => Unit)(implicit executor: ExecutionContext): Unit = {
jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
override def onJobCancelled(job: JobHandle[T]): Unit = {
val onJobCancelledTask = new Runnable {
override def run(): Unit = func(job.cancel(false))
}
executor.execute(onJobCancelledTask)
}
})
}
/**
* Returns whether the job has already been completed with
* a value or an exception.
*
* $nonDeterministic
*
* @return `true` if the job is already completed, `false` otherwise.
*/
override def isCompleted: Boolean = jobHandle.isDone
/**
* The result value of the job.
*
* If the job is not completed the returned value will be `None`.
* If the job is completed the value will be `Some(Success(t))`.
* if it contains a valid result, or `Some(Failure(error))` if it contains
* an exception.
*/
override def value: Option[Try[T]] = {
if (isCompleted) {
Some(Try(getJavaFutureResult(jobHandle)))
} else {
None
}
}
/**
* Supports Scala's Await.result(atmost) which awaits the completion of the job and returns the
* result (of type `T`).
*
* @param atMost
* maximum wait time, which may be negative (no waiting is done),
* [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting,
* or a finite positive duration.
* @return the result value if job is completed within the specific maximum wait time.
* @throws Exception the underlying exception on the execution of the job.
*/
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T =
getJavaFutureResult(jobHandle, atMost)
/**
* Supports Scala's Await.ready(atmost) which awaits the completion of the job.
*
* @param atMost
* maximum wait time, which may be negative (no waiting is done),
* [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting,
* or a finite positive duration.
* @return ScalaJobHandle
* @throws InterruptedException if the current thread is interrupted while waiting.
* @throws TimeoutException if after waiting for the specified time the job
* is still not ready.
*/
@throws(classOf[InterruptedException])
@throws(classOf[TimeoutException])
override def ready(atMost: Duration)(implicit permit: CanAwait): ScalaJobHandle.this.type = {
getJavaFutureResult(jobHandle, atMost)
this
}
}
private abstract class AbstractScalaJobHandleListener[T] extends Listener[T] {
override def onJobQueued(job: JobHandle[T]): Unit = {}
override def onJobCancelled(job: JobHandle[T]): Unit = {}
override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {}
override def onJobStarted(job: JobHandle[T]): Unit = {}
override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {}
}