blob: 6250483191e3f0ded04548bdca0eb59df57309fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.utils
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.actor.ActorSystem
import akka.actor.Cancellable
import akka.actor.Scheduler
object ExecutionContextFactory {
private type CancellableFuture[T] = (Cancellable, Future[T])
/**
* akka.pattern.after has a memory drag issue: it opaquely
* schedules an actor which consequently results in drag for the
* timeout duration
*
*/
def expire[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])(
implicit ec: ExecutionContext): CancellableFuture[T] = {
val p = Promise[T]()
val cancellable = using.scheduleOnce(duration) {
p completeWith {
try value
catch { case NonFatal(t) => Future.failed(t) }
}
}
(cancellable, p.future)
}
/**
* Return the first of the two given futures to complete; if f1
* finishes first, we will cancel f2
*
*/
def firstCompletedOf2[T](f1: Future[T], f2Cancellable: CancellableFuture[T])(
implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val (f2Killswitch, f2) = f2Cancellable
f1.onComplete { result =>
p.tryComplete(result)
f2Killswitch.cancel()
}
f2.onComplete(p.tryComplete)
p.future
}
implicit class FutureExtensions[T](f: Future[T]) {
def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
firstCompletedOf2(f, expire(timeout, system.scheduler)(Future.failed(msg)))
}
def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: => Future[T])(
implicit system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
firstCompletedOf2(f, expire(timeout, system.scheduler)(alt))
}
}
/**
* Makes an execution context for Futures using Executors.newCachedThreadPool. From the javadoc:
*
* Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads
* when they are available. These pools will typically improve the performance of programs that execute many
* short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available.
* If no existing thread is available, a new thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle
* for long enough will not consume any resources. Note that pools with similar properties but different details
* (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.
*/
def makeCachedThreadPoolExecutionContext(): ExecutionContext = {
ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
}
}