blob: f701e18ac9027146c6d72dca4afacb66c004d7a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
/**
* A ScheduledThreadPoolExecutor which allows threads to time out after the keep alive time. With
* the normal ScheduledThreadPoolExecutor, there is no way to configure it such that it only add
* threads as needed.
*
* This executor is not very useful if you only want to have 1 thread. Use the
* ScheduledThreadPoolExecutor in that case. This class with throw an exception if you try to
* configure it with one thread.
*
*
*/
@SuppressWarnings("synthetic-access")
public class ScheduledThreadPoolExecutorWithKeepAlive extends ThreadPoolExecutor
implements ScheduledExecutorService {
private final ScheduledThreadPoolExecutor timer;
private final ThreadsMonitoring threadMonitoring;
public ScheduledThreadPoolExecutorWithKeepAlive(int corePoolSize, long keepAlive,
TimeUnit timeUnit, ThreadFactory threadFactory, ThreadsMonitoring tMonitoring) {
super(0, corePoolSize - 1, keepAlive, timeUnit, new SynchronousQueue(), threadFactory,
new BlockCallerPolicy());
timer = new ScheduledThreadPoolExecutor(1, threadFactory) {
@Override
protected void terminated() {
super.terminated();
ScheduledThreadPoolExecutorWithKeepAlive.super.shutdown();
}
};
this.threadMonitoring = tMonitoring;
}
@Override
public void execute(Runnable command) {
timer.execute(new HandOffTask(command));
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (this.threadMonitoring != null) {
threadMonitoring.startMonitor(ThreadsMonitoring.Mode.ScheduledThreadExecutor);
}
}
@Override
protected void afterExecute(Runnable r, Throwable ex) {
if (this.threadMonitoring != null) {
threadMonitoring.endMonitor();
}
}
@Override
public Future submit(Callable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
@Override
public Future submit(Runnable task, Object result) {
return schedule(task, 0, TimeUnit.NANOSECONDS, result);
}
@Override
public Future submit(Runnable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
@Override
public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
DelegatingScheduledFuture future = new DelegatingScheduledFuture(callable);
ScheduledFuture timerFuture = timer.schedule(new HandOffTask(future), delay, unit);
future.setDelegate(timerFuture);
return future;
}
@Override
public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) {
return schedule(command, delay, unit, null);
}
private ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit, Object result) {
DelegatingScheduledFuture future = new DelegatingScheduledFuture(command, result);
ScheduledFuture timerFuture = timer.schedule(new HandOffTask(future), delay, unit);
future.setDelegate(timerFuture);
return future;
}
@Override
public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
DelegatingScheduledFuture future = new DelegatingScheduledFuture(command, null, true);
ScheduledFuture timerFuture =
timer.scheduleAtFixedRate(new HandOffTask(future), initialDelay, period, unit);
future.setDelegate(timerFuture);
return future;
}
@Override
public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
DelegatingScheduledFuture future = new DelegatingScheduledFuture(command, null, true);
ScheduledFuture timerFuture =
timer.scheduleWithFixedDelay(new HandOffTask(future), initialDelay, delay, unit);
future.setDelegate(timerFuture);
return future;
}
@Override
public void shutdown() {
// note - the timer has a "hook" which will shutdown our
// worker pool once the timer is shutdown.
timer.shutdown();
}
/**
* Shutdown the executor immediately, returning a list of tasks that haven't been run. Like
* ScheduledThreadPoolExecutor, this returns a list of RunnableScheduledFuture objects, instead of
* the actual tasks submitted. However, these Future objects are even less useful than the ones
* returned by ScheduledThreadPoolExecutor. In particular, they don't match the future returned by
* the {{@link #submit(Runnable)} method, and the run method won't do anything useful. This list
* should only be used as a count of the number of tasks that didn't execute.
*
* @see ScheduledThreadPoolExecutor#shutdownNow()
*/
@Override
public List shutdownNow() {
List tasks = timer.shutdownNow();
super.shutdownNow();
return tasks;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long start = System.nanoTime();
if (!timer.awaitTermination(timeout, unit)) {
return false;
}
long elapsed = System.nanoTime() - start;
long remaining = unit.toNanos(timeout) - elapsed;
if (remaining < 0) {
return false;
}
return super.awaitTermination(remaining, TimeUnit.NANOSECONDS);
}
@Override
public int getCorePoolSize() {
return super.getCorePoolSize() + 1;
}
@Override
public int getLargestPoolSize() {
return super.getLargestPoolSize() + 1;
}
@Override
public int getMaximumPoolSize() {
return super.getMaximumPoolSize() + 1;
}
@Override
public int getPoolSize() {
return super.getPoolSize() + 1;
}
@Override
public boolean isShutdown() {
return timer.isShutdown();
}
@Override
public boolean isTerminated() {
return super.isTerminated() && timer.isTerminated();
}
// method that is in ScheduledThreadPoolExecutor that we should expose here
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean b) {
timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(b);
}
// method that is in ScheduledThreadPoolExecutor that we should expose here
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean b) {
timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}
/**
* A Runnable which we put in the timer which simply hands off the contain task for execution in
* the thread pool when the timer fires.
*
*/
private class HandOffTask implements Runnable {
private final Runnable task;
public HandOffTask(Runnable task) {
this.task = task;
}
@Override
public void run() {
try {
ScheduledThreadPoolExecutorWithKeepAlive.super.execute(task);
} catch (RejectedExecutionException e) {
// do nothing, we'll only get this if we're shutting down.
}
}
}
/**
* The future returned by the schedule* methods on this class. This future will not return a value
* until the task has actually executed in the thread pool, but it allows us to cancel associated
* timer task.
*
*/
private static class DelegatingScheduledFuture<V> extends FutureTask<V>
implements ScheduledFuture<V> {
private ScheduledFuture<V> delegate;
private final boolean periodic;
public DelegatingScheduledFuture(Runnable runnable, V result) {
this(runnable, result, false);
}
public DelegatingScheduledFuture(Callable<V> callable) {
this(callable, false);
}
public DelegatingScheduledFuture(Runnable runnable, V result, boolean periodic) {
super(runnable, result);
this.periodic = periodic;
}
public DelegatingScheduledFuture(Callable<V> callable, boolean periodic) {
super(callable);
this.periodic = periodic;
}
@Override
public void run() {
if (periodic) {
super.runAndReset();
} else {
super.run();
}
}
public void setDelegate(ScheduledFuture<V> future) {
this.delegate = future;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
delegate.cancel(true);
return super.cancel(mayInterruptIfRunning);
}
@Override
public long getDelay(TimeUnit unit) {
return delegate.getDelay(unit);
}
@Override
public int compareTo(Delayed o) {
return delegate.compareTo(o);
}
@Override
public boolean equals(Object o) {
return delegate.equals(o);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
}
/**
* A RejectedExecutionHandler which causes the caller to block until there is space in the queue
* for the task.
*/
protected static class BlockCallerPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("executor has been shutdown");
} else {
try {
executor.getQueue().put(r);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
RejectedExecutionException e = new RejectedExecutionException("interrupted");
e.initCause(ie);
throw e;
}
}
}
}
}