blob: 1af3710391a74b2b3a995a978c69a7301be558b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
/**
* A decorator for a ScheduledExecutorService which tries to make sure that there is only one task
* in the queue for the executor service that has been submitted through this decorator.
*
* This class is useful if you have a task that you want to make sure runs at least once after an
* event, but if the event happens repeatedly before the task runs, the task should still only run
* once.
*
* In many cases it might make sense to have multiple decorators, or to submit one kind of task
* through the decorator and other tasks directly through the executor.
*
* For example, a task that recovers redundancy for all buckets of a PR when a member crashes only
* needs to be run once, no matter how many members crash before the task starts. But after the task
* has started, we want to make sure we schedule another execution for the next crash.
*
* If the a new task is scheduled to run sooner than the task that is currently in the queue, the
* currently queued task will be canceled and the new task will be submitted to the queue with the
* new time.
*
* @since GemFire 6.0
*/
@SuppressWarnings("synthetic-access")
public class OneTaskOnlyExecutor {
private final ThreadsMonitoring threadMonitoring;
private final ScheduledExecutorService ex;
private ScheduledFuture<?> future = null;
private ConflatedTaskListener listener;
public OneTaskOnlyExecutor(ScheduledExecutorService ex, ThreadsMonitoring tMonitoring) {
this(ex, new ConflatedTaskListenerAdapter(), tMonitoring);
}
public OneTaskOnlyExecutor(ScheduledExecutorService ex, ConflatedTaskListener listener,
ThreadsMonitoring tMonitoring) {
this.ex = ex;
this.listener = listener;
this.threadMonitoring = tMonitoring;
}
/**
* Schedule an execution of a task. This will either add the task to the execution service, or if
* a task has already been scheduled through this decorator and is still pending execution it will
* return the future associated with the previously scheduled task.
*
* @param runnable a runnable to execution
* @param delay the time to delay before execution
* @param unit the time unit
* @return The future associated with this task, or with a previously scheduled task if that task
* has not yet been run.
* @see ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
*/
public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) {
synchronized (this) {
if (future == null || future.isCancelled() || future.isDone()
|| future.getDelay(unit) > delay) {
if (future != null && !future.isDone()) {
future.cancel(false);
listener.taskDropped();
}
future = ex.schedule(new DelegatingRunnable(runnable), delay, unit);
} else {
listener.taskDropped();
}
}
return future;
}
/**
* Schedule an execution of a task. This will either add the task to the execution service, or if
* a task has already been scheduled through this decorator and is still pending execution it will
* return the future associated with the previously scheduled task.
*
* @param callable a callable to execute
* @param delay the time to delay before execution
* @param unit the time unit
* @return The future associated with this task, or with a previously scheduled task if that task
* has not yet been run.
* @see ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
*/
public <T> ScheduledFuture<?> schedule(Callable<T> callable, long delay, TimeUnit unit) {
synchronized (this) {
if (future == null || future.isCancelled() || future.isDone()
|| future.getDelay(unit) > delay) {
if (future != null && !future.isDone()) {
future.cancel(false);
listener.taskDropped();
}
future = ex.schedule(new DelegatingCallable<T>(callable), delay, unit);
} else {
listener.taskDropped();
}
}
return future;
}
/**
* Removes the canceled tasks from the executor queue.
*/
public void purge() {
((ScheduledThreadPoolExecutor) ex).purge();
}
private class DelegatingRunnable implements Runnable {
private final Runnable runnable;
public DelegatingRunnable(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
synchronized (OneTaskOnlyExecutor.this) {
future = null;
}
beforeExecute();
try {
runnable.run();
} finally {
afterExecute();
}
}
}
private class DelegatingCallable<T> implements Callable<T> {
private final Callable<T> callable;
public DelegatingCallable(Callable<T> callable) {
this.callable = callable;
}
@Override
public T call() throws Exception {
synchronized (OneTaskOnlyExecutor.this) {
future = null;
}
return callable.call();
}
}
public interface ConflatedTaskListener {
void taskDropped();
}
public static class ConflatedTaskListenerAdapter implements ConflatedTaskListener {
@Override
public void taskDropped() {
}
}
protected void beforeExecute() {
if (this.threadMonitoring != null) {
threadMonitoring.startMonitor(ThreadsMonitoring.Mode.OneTaskOnlyExecutor);
}
}
protected void afterExecute() {
if (this.threadMonitoring != null) {
threadMonitoring.endMonitor();
}
}
}