blob: 8008f91e70fa3788e9eb9959569f4daa1a80478b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import static java.lang.Integer.getInteger;
import static java.lang.Long.getLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.geode.distributed.internal.OperationExecutors.FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX;
import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* A ThreadPoolExecutor with stat support. This executor also has a buffer that rejected executions
* spill over into.
*
* <p>
* This executor has also been modified to handle rejected execution in one of three ways: If the
* executor is for function execution we ignore size caps on the thread pool<br>
* If the executor has a SynchronousQueue or SynchronousQueueNoSpin then rejected executions are run
* in the current thread.<br>
* Otherwise a thread is started in the pool that buffers the rejected tasks and puts them back into
* the executor.
*/
public class FunctionExecutionPooledExecutor extends ThreadPoolExecutor {
private static final Logger logger = LogService.getLogger();
private static final int DEFAULT_RETRY_INTERVAL = 5000;
private static final int DEFAULT_IDLE_THREAD_TIMEOUT_MILLIS = 30000 * 60;
private static final int OFFER_TIME_MILLIS =
getInteger(GEMFIRE_PREFIX + "RETRY_INTERVAL", DEFAULT_RETRY_INTERVAL);
/**
* Identifier for function execution threads and any of their children
*/
@MakeNotStatic()
private static final InheritableThreadLocal<Boolean> isFunctionExecutionThread =
new InheritableThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
private final PoolStatHelper stats;
private final ThreadsMonitoring threadMonitoring;
/**
* Used to buffer up tasks that would be have been rejected. Only used (i.e. non-null) if
* constructor queue is not a SynchronousQueue.
*/
private BlockingQueue<Runnable> bufferQueue;
/**
* Used to consume items off the bufferQueue and put them into the pools synchronous queue. Only
* used (i.e. non-null) if constructor queue is not a SynchronousQueue.
*/
private Thread bufferConsumer;
/**
* Sets timeout to IDLE_THREAD_TIMEOUT
*/
public FunctionExecutionPooledExecutor(int poolSize, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, PoolStatHelper poolStatHelper,
ThreadsMonitoring threadsMonitoring) {
// TODO: workQueue is always an OverflowQueueWithDMStats which is a LinkedBlockingQueue
/*
* How long an idle thread will wait, in milliseconds, before it is removed from its thread
* pool. Default is (30000 * 60) ms (30 minutes). It is not static so it can be set at runtime
* and pick up different values.
*/
super(getCorePoolSize(poolSize), poolSize, getLong(GEMFIRE_PREFIX + "IDLE_THREAD_TIMEOUT",
DEFAULT_IDLE_THREAD_TIMEOUT_MILLIS), MILLISECONDS, getSynchronousQueue(workQueue),
threadFactory, newRejectedExecutionHandler(workQueue, true));
stats = poolStatHelper;
threadMonitoring = threadsMonitoring;
if (!(workQueue instanceof SynchronousQueue)) {
bufferQueue = workQueue;
// create a thread that takes from bufferQueue and puts into result
final BlockingQueue<Runnable> takeQueue = workQueue;
final BlockingQueue<Runnable> putQueue = getQueue();
Runnable r = () -> {
try {
for (;;) {
SystemFailure.checkFailure();
Runnable task = takeQueue.take();
if (true) {
// In the function case, offer the request to the work queue.
// If it fails, execute it anyway. This will cause the RejectedExecutionHandler to
// spin off a thread for it.
if (!putQueue.offer(task, OFFER_TIME_MILLIS, MILLISECONDS)) {
execute(task);
}
} else {
// In the non-function case, put the request on the work queue.
putQueue.put(task);
}
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
// this thread is being shutdown so just return;
}
};
bufferConsumer = threadFactory.newThread(r);
bufferConsumer.start();
}
}
/**
* Is the current thread used for executing Functions?
*/
public static boolean isFunctionExecutionThread() {
return isFunctionExecutionThread.get();
}
private static SynchronousQueue<Runnable> getSynchronousQueue(
BlockingQueue<Runnable> blockingQueue) {
if (blockingQueue instanceof SynchronousQueue) {
return (SynchronousQueue<Runnable>) blockingQueue;
}
return new SynchronousQueue<>();
}
private static RejectedExecutionHandler newRejectedExecutionHandler(
BlockingQueue<Runnable> blockingQueue, boolean forFunctionExecution) {
if (forFunctionExecution) {
return new FunctionExecutionRejectedExecutionHandler(blockingQueue);
}
if (blockingQueue instanceof SynchronousQueue) {
return new CallerRunsPolicy();
// return new BlockHandler();
}
// create a thread that takes from bufferQueue and puts into result
return new BufferHandler();
}
static void setIsFunctionExecutionThread(Boolean isExecutionThread) {
isFunctionExecutionThread.set(isExecutionThread);
}
@Override
public void shutdown() {
try {
super.shutdown();
} finally {
terminated();
}
}
@Override
protected void terminated() {
if (bufferConsumer != null) {
bufferConsumer.interrupt();
}
super.terminated();
}
@Override
public List<Runnable> shutdownNow() {
terminated();
List<Runnable> l = super.shutdownNow();
if (bufferQueue != null) {
bufferQueue.drainTo(l);
}
return l;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (stats != null) {
stats.startJob();
}
if (threadMonitoring != null) {
threadMonitoring.startMonitor(ThreadsMonitoring.Mode.FunctionExecutor);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (stats != null) {
stats.endJob();
}
if (threadMonitoring != null) {
threadMonitoring.endMonitor();
}
}
@VisibleForTesting
public BlockingQueue<Runnable> getBufferQueue() {
return bufferQueue;
}
private static int getCorePoolSize(int maxSize) {
if (maxSize == Integer.MAX_VALUE) {
return 0;
}
return 1;
}
@VisibleForTesting
public static class FunctionExecutionRejectedExecutionHandler
implements RejectedExecutionHandler {
private final BlockingQueue<Runnable> blockingQueue;
private FunctionExecutionRejectedExecutionHandler(BlockingQueue<Runnable> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("executor has been shutdown");
}
if (isBufferConsumer((FunctionExecutionPooledExecutor) executor)) {
handleRejectedExecutionForBufferConsumer(r, executor);
} else if (isFunctionExecutionThread()) {
handleRejectedExecutionForFunctionExecutionThread(r, executor);
} else {
// In the normal case, add the rejected request to the blocking queue.
try {
blockingQueue.put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// this thread is being shutdown so just return;
}
}
}
private boolean isBufferConsumer(FunctionExecutionPooledExecutor executor) {
return Thread.currentThread() == executor.bufferConsumer;
}
private boolean isFunctionExecutionThread() {
return isFunctionExecutionThread.get();
}
/**
* Handle rejected execution for the bufferConsumer (the thread that takes from the blocking
* queue and offers to the synchronous queue). Spin off a thread directly in this case,
* since an offer has already been made to the synchronous queue and failed. This means that
* all the function execution threads are in use.
*/
private void handleRejectedExecutionForBufferConsumer(Runnable r,
ThreadPoolExecutor executor) {
logger.warn("An additional " + FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX
+ " thread is being launched because all " + executor.getMaximumPoolSize()
+ " thread pool threads are in use for greater than " + OFFER_TIME_MILLIS + " ms");
launchAdditionalThread(r, executor);
}
/**
* Handle rejected execution for a function execution thread. Spin off a thread directly in
* this case, since that means a function is executing another function. The child function
* request shouldn't be in the queue behind the parent request since the parent function is
* dependent on the child function executing.
*/
private void handleRejectedExecutionForFunctionExecutionThread(Runnable r,
ThreadPoolExecutor executor) {
if (logger.isDebugEnabled()) {
logger.warn(
"An additional {} thread is being launched to prevent slow performance due to nested function executions",
FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX);
}
launchAdditionalThread(r, executor);
}
private void launchAdditionalThread(Runnable r, ThreadPoolExecutor executor) {
Thread th = executor.getThreadFactory().newThread(r);
th.start();
}
}
/**
* This handler fronts a synchronous queue, that is owned by the parent ThreadPoolExecutor, with a
* client supplied BlockingQueue that supports storage (the buffer queue). A dedicated thread is
* used to consume off the buffer queue and put into the synchronous queue.
*/
public static class BufferHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("executor has been shutdown");
}
try {
FunctionExecutionPooledExecutor pool = (FunctionExecutionPooledExecutor) executor;
pool.bufferQueue.put(r);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("interrupted", ie);
}
}
}
}