blob: d484ec992da48126f478ada065283315a5218522 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.utils
import java.util.concurrent._
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration.Duration
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
private[griffin] object ThreadUtils {
private val sameThreadExecutionContext =
ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor())
/**
* An `ExecutionContextExecutor` that runs each task in the thread that invokes `execute/submit`.
* The caller should make sure the tasks running in this `ExecutionContextExecutor` are short and
* never block.
*/
def sameThread: ExecutionContextExecutor = sameThreadExecutionContext
/**
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
*/
def namedThreadFactory(prefix: String): ThreadFactory = {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
}
/**
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
/**
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(
prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable],
threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
}
/**
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
}
/**
* Wrapper over newSingleThreadExecutor.
*/
def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
Executors.newSingleThreadExecutor(threadFactory)
}
/**
* Wrapper over ScheduledThreadPoolExecutor.
*/
def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true)
executor
}
/**
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
* method for clarity. The exception stack traces will be like the following
*
* SomeException: exception-message
* at CallerClass.body-method (sourcefile.scala)
* at ... run in separate thread using org.apache.griffin.measure.utils.ThreadUtils ... ()
* at CallerClass.caller-method (sourcefile.scala)
* ...
*/
def runInNewThread[T](
threadName: String,
isDaemon: Boolean = true)(body: => T): T = {
@volatile var exception: Option[Throwable] = None
@volatile var result: T = null.asInstanceOf[T]
val thread = new Thread(threadName) {
override def run(): Unit = {
try {
result = body
} catch {
case NonFatal(e) =>
exception = Some(e)
}
}
}
thread.setDaemon(isDaemon)
thread.start()
thread.join()
exception match {
case Some(realException) =>
// Remove the part of the stack that shows method calls into this helper method
// This means drop everything from the top until the stack element
// ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
// Remove the part of the new thread stack that shows methods call from this helper method
val extraStackTrace = realException.getStackTrace.takeWhile(
! _.getClassName.contains(this.getClass.getSimpleName))
// Combine the two stack traces, with a place holder just specifying that there
// was a helper method used, without any further details of the helper
val placeHolderStackElem = new StackTraceElement(
s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
" ", "", -1)
val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
// Update the stack trace and rethrow the exception in the caller thread
realException.setStackTrace(finalStackTrace)
throw realException
case None =>
result
}
}
/**
* Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
*/
def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
// Custom factory to set thread names
val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
override def newThread(pool: SForkJoinPool) =
new SForkJoinWorkerThread(pool) {
setName(prefix + "-" + super.getName)
}
}
new SForkJoinPool(maxThreadNumber, factory,
null, // handler
false // asyncMode
)
}
// scalastyle:off awaitresult
/**
* Preferred alternative to `Await.result()`.
*
* This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
* that this thread's stack trace appears in logs.
*
* In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
* `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
* As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
* method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
* In general, we should use this method because it's hard to debug when [[ThreadLocal]]s leak
* to other tasks.
*/
@throws(classOf[Exception])
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.result(atMost)(awaitPermission)
} catch {
// TimeoutException is thrown in the current thread, so not need to warp the exception.
case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
throw new Exception("Exception thrown in awaitResult: ", t)
}
}
// scalastyle:on awaitresult
// scalastyle:off awaitready
/**
* Preferred alternative to `Await.ready()`.
*
* @see [[awaitResult]]
*/
@throws(classOf[Exception])
def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.ready(atMost)(awaitPermission)
} catch {
// TimeoutException is thrown in the current thread, so not need to warp the exception.
case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
throw new Exception("Exception thrown in awaitResult: ", t)
}
}
// scalastyle:on awaitready
}