| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal; |
| |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.geode.internal.monitoring.ThreadsMonitoring; |
| |
| /** |
| * A decorator for a ScheduledExecutorService which tries to make sure that there is only one task |
| * in the queue for the executor service that has been submitted through this decorator. |
| * |
| * This class is useful if you have a task that you want to make sure runs at least once after an |
| * event, but if the event happens repeatedly before the task runs, the task should still only run |
| * once. |
| * |
| * In many cases it might make sense to have multiple decorators, or to submit one kind of task |
| * through the decorator and other tasks directly through the executor. |
| * |
| * For example, a task that recovers redundancy for all buckets of a PR when a member crashes only |
| * needs to be run once, no matter how many members crash before the task starts. But after the task |
| * has started, we want to make sure we schedule another execution for the next crash. |
| * |
| * If the a new task is scheduled to run sooner than the task that is currently in the queue, the |
| * currently queued task will be canceled and the new task will be submitted to the queue with the |
| * new time. |
| * |
| * @since GemFire 6.0 |
| */ |
| @SuppressWarnings("synthetic-access") |
| public class OneTaskOnlyExecutor { |
| |
| private final ThreadsMonitoring threadMonitoring; |
| private final ScheduledExecutorService ex; |
| private ScheduledFuture<?> future = null; |
| private ConflatedTaskListener listener; |
| |
| public OneTaskOnlyExecutor(ScheduledExecutorService ex, ThreadsMonitoring tMonitoring) { |
| this(ex, new ConflatedTaskListenerAdapter(), tMonitoring); |
| } |
| |
| public OneTaskOnlyExecutor(ScheduledExecutorService ex, ConflatedTaskListener listener, |
| ThreadsMonitoring tMonitoring) { |
| this.ex = ex; |
| this.listener = listener; |
| this.threadMonitoring = tMonitoring; |
| } |
| |
| /** |
| * Schedule an execution of a task. This will either add the task to the execution service, or if |
| * a task has already been scheduled through this decorator and is still pending execution it will |
| * return the future associated with the previously scheduled task. |
| * |
| * @param runnable a runnable to execution |
| * @param delay the time to delay before execution |
| * @param unit the time unit |
| * @return The future associated with this task, or with a previously scheduled task if that task |
| * has not yet been run. |
| * @see ScheduledExecutorService#schedule(Runnable, long, TimeUnit) |
| */ |
| public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) { |
| synchronized (this) { |
| if (future == null || future.isCancelled() || future.isDone() |
| || future.getDelay(unit) > delay) { |
| if (future != null && !future.isDone()) { |
| future.cancel(false); |
| listener.taskDropped(); |
| } |
| future = ex.schedule(new DelegatingRunnable(runnable), delay, unit); |
| } else { |
| listener.taskDropped(); |
| } |
| } |
| return future; |
| } |
| |
| /** |
| * Schedule an execution of a task. This will either add the task to the execution service, or if |
| * a task has already been scheduled through this decorator and is still pending execution it will |
| * return the future associated with the previously scheduled task. |
| * |
| * @param callable a callable to execute |
| * @param delay the time to delay before execution |
| * @param unit the time unit |
| * @return The future associated with this task, or with a previously scheduled task if that task |
| * has not yet been run. |
| * @see ScheduledExecutorService#schedule(Runnable, long, TimeUnit) |
| */ |
| public <T> ScheduledFuture<?> schedule(Callable<T> callable, long delay, TimeUnit unit) { |
| synchronized (this) { |
| if (future == null || future.isCancelled() || future.isDone() |
| || future.getDelay(unit) > delay) { |
| if (future != null && !future.isDone()) { |
| future.cancel(false); |
| listener.taskDropped(); |
| } |
| future = ex.schedule(new DelegatingCallable<T>(callable), delay, unit); |
| } else { |
| listener.taskDropped(); |
| } |
| } |
| return future; |
| } |
| |
| /** |
| * Removes the canceled tasks from the executor queue. |
| */ |
| public void purge() { |
| ((ScheduledThreadPoolExecutor) ex).purge(); |
| } |
| |
| private class DelegatingRunnable implements Runnable { |
| private final Runnable runnable; |
| |
| public DelegatingRunnable(Runnable runnable) { |
| this.runnable = runnable; |
| } |
| |
| @Override |
| public void run() { |
| synchronized (OneTaskOnlyExecutor.this) { |
| future = null; |
| } |
| beforeExecute(); |
| try { |
| runnable.run(); |
| } finally { |
| afterExecute(); |
| } |
| } |
| } |
| |
| private class DelegatingCallable<T> implements Callable<T> { |
| private final Callable<T> callable; |
| |
| public DelegatingCallable(Callable<T> callable) { |
| this.callable = callable; |
| } |
| |
| @Override |
| public T call() throws Exception { |
| synchronized (OneTaskOnlyExecutor.this) { |
| future = null; |
| } |
| return callable.call(); |
| } |
| } |
| |
| public interface ConflatedTaskListener { |
| void taskDropped(); |
| } |
| |
| public static class ConflatedTaskListenerAdapter implements ConflatedTaskListener { |
| @Override |
| public void taskDropped() { |
| |
| } |
| } |
| |
| protected void beforeExecute() { |
| if (this.threadMonitoring != null) { |
| threadMonitoring.startMonitor(ThreadsMonitoring.Mode.OneTaskOnlyExecutor); |
| } |
| } |
| |
| protected void afterExecute() { |
| if (this.threadMonitoring != null) { |
| threadMonitoring.endMonitor(); |
| } |
| } |
| } |