blob: 036c9a60630eaa3a0e4bab09d9fe8eb8597c4d16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.util.Collections
import java.util.concurrent.TimeUnit
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.JobWaiter
import org.apache.spark.util.ThreadUtils
/**
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
/**
* Cancels the execution of this action.
*/
def cancel(): Unit
/**
* Blocks until this action completes.
*
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @return this FutureAction
*/
override def ready(atMost: Duration)(implicit permit: CanAwait): FutureAction.this.type
/**
* Awaits and returns the result (of type T) of this action.
*
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @throws Exception exception during action execution
* @return the result value if the action is completed within the specific maximum wait time
*/
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T
/**
* When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit
/**
* Returns whether the action has already been completed with a value or an exception.
*/
override def isCompleted: Boolean
/**
* Returns whether the action has been cancelled.
*/
def isCancelled: Boolean
/**
* The value of this Future.
*
* If the future is not completed the returned value will be None. If the future is completed
* the value will be Some(Success(t)) if it contains a valid result, or Some(Failure(error)) if
* it contains an exception.
*/
override def value: Option[Try[T]]
// These two methods must be implemented in Scala 2.12. They're implemented as a no-op here
// and then filled in with a real implementation in the two subclasses below. The no-op exists
// here so that those implementations can declare "override", necessary in 2.12, while working
// in 2.11, where the method doesn't exist in the superclass.
// After 2.11 support goes away, remove these two:
def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] =
throw new UnsupportedOperationException()
def transformWith[S](f: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] =
throw new UnsupportedOperationException()
/**
* Blocks and returns the result of this job.
*/
@throws(classOf[SparkException])
def get(): T = ThreadUtils.awaitResult(this, Duration.Inf)
/**
* Returns the job IDs run by the underlying async operation.
*
* This returns the current snapshot of the job list. Certain operations may run multiple
* jobs, so multiple calls to this method may return different lists.
*/
def jobIds: Seq[Int]
}
/**
* Scala 2.12 defines the two new transform/transformWith methods mentioned above. Impementing
* these for 2.12 in the Spark class here requires delegating to these same methods in an
* underlying Future object. But that only exists in 2.12. But these methods are only called
* in 2.12. So define helper shims to access these methods on a Future by reflection.
*/
private[spark] object FutureAction {
private val transformTryMethod =
try {
classOf[Future[_]].getMethod("transform", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}
private val transformWithTryMethod =
try {
classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}
private[spark] def transform[T, S](
future: Future[T],
f: (Try[T]) => Try[S],
executor: ExecutionContext): Future[S] =
transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]
private[spark] def transformWith[T, S](
future: Future[T],
f: (Try[T]) => Future[S],
executor: ExecutionContext): Future[S] =
transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]
}
/**
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
@DeveloperApi
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {
@volatile private var _cancelled: Boolean = false
override def cancel() {
_cancelled = true
jobWaiter.cancel()
}
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
jobWaiter.completionFuture.ready(atMost)
this
}
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T = {
jobWaiter.completionFuture.ready(atMost)
assert(value.isDefined, "Future has not completed properly")
value.get.get
}
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) {
jobWaiter.completionFuture onComplete {_ => func(value.get)}
}
override def isCompleted: Boolean = jobWaiter.jobFinished
override def isCancelled: Boolean = _cancelled
override def value: Option[Try[T]] =
jobWaiter.completionFuture.value.map {res => res.map(_ => resultFunc)}
def jobIds: Seq[Int] = Seq(jobWaiter.jobId)
override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)
override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)
}
/**
* Handle via which a "run" function passed to a [[ComplexFutureAction]]
* can submit jobs for execution.
*/
@DeveloperApi
trait JobSubmitter {
/**
* Submit a job for execution and return a FutureAction holding the result.
* This is a wrapper around the same functionality provided by SparkContext
* to enable cancellation.
*/
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): FutureAction[R]
}
/**
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and cancelling any pending
* jobs.
*/
@DeveloperApi
class ComplexFutureAction[T](run : JobSubmitter => Future[T])
extends FutureAction[T] { self =>
@volatile private var _cancelled = false
@volatile private var subActions: List[FutureAction[_]] = Nil
// A promise used to signal the future.
private val p = Promise[T]().tryCompleteWith(run(jobSubmitter))
override def cancel(): Unit = synchronized {
_cancelled = true
p.tryFailure(new SparkException("Action has been cancelled"))
subActions.foreach(_.cancel())
}
private def jobSubmitter = new JobSubmitter {
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): FutureAction[R] = self.synchronized {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
if (!isCancelled) {
val job = rdd.context.submitJob(
rdd,
processPartition,
partitions,
resultHandler,
resultFunc)
subActions = job :: subActions
job
} else {
throw new SparkException("Action has been cancelled")
}
}
}
override def isCancelled: Boolean = _cancelled
@throws(classOf[InterruptedException])
@throws(classOf[scala.concurrent.TimeoutException])
override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
p.future.ready(atMost)(permit)
this
}
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T = {
p.future.result(atMost)(permit)
}
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = {
p.future.onComplete(func)(executor)
}
override def isCompleted: Boolean = p.isCompleted
override def value: Option[Try[T]] = p.future.value
def jobIds: Seq[Int] = subActions.flatMap(_.jobIds)
override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(p.future, f, e)
override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(p.future, f, e)
}
private[spark]
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
extends JavaFutureAction[T] {
import scala.collection.JavaConverters._
override def isCancelled: Boolean = futureAction.isCancelled
override def isDone: Boolean = {
// According to java.util.Future's Javadoc, this returns True if the task was completed,
// whether that completion was due to successful execution, an exception, or a cancellation.
futureAction.isCancelled || futureAction.isCompleted
}
override def jobIds(): java.util.List[java.lang.Integer] = {
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
}
private def getImpl(timeout: Duration): T = {
// This will throw TimeoutException on timeout:
ThreadUtils.awaitReady(futureAction, timeout)
futureAction.value.get match {
case scala.util.Success(value) => converter(value)
case scala.util.Failure(exception) =>
if (isCancelled) {
throw new CancellationException("Job cancelled").initCause(exception)
} else {
// java.util.Future.get() wraps exceptions in ExecutionException
throw new ExecutionException("Exception thrown by job", exception)
}
}
}
override def get(): T = getImpl(Duration.Inf)
override def get(timeout: Long, unit: TimeUnit): T =
getImpl(Duration.fromNanos(unit.toNanos(timeout)))
override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
if (isDone) {
// According to java.util.Future's Javadoc, this should return false if the task is completed.
false
} else {
// We're limited in terms of the semantics we can provide here; our cancellation is
// asynchronous and doesn't provide a mechanism to not cancel if the job is running.
futureAction.cancel()
true
}
}
}