blob: 94e068c6615d8eb0fa0a68ebfd61bbf831f42731 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.systems;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
import org.apache.cassandra.concurrent.LocalAwareSequentialExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.concurrent.SequentialExecutorPlus;
import org.apache.cassandra.concurrent.SingleThreadExecutorPlus;
import org.apache.cassandra.concurrent.SyncFutureTask;
import org.apache.cassandra.concurrent.TaskFactory;
import org.apache.cassandra.simulator.OrderOn;
import org.apache.cassandra.simulator.systems.InterceptingAwaitable.InterceptingCondition;
import org.apache.cassandra.simulator.systems.NotifyThreadPaused.AwaitPaused;
import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.WithResources;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.NotScheduledFuture;
import org.apache.cassandra.utils.concurrent.RunnableFuture;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static java.util.Collections.newSetFromMap;
import static java.util.Collections.synchronizedMap;
import static java.util.Collections.synchronizedSet;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_SIMULATOR_DEBUG;
import static org.apache.cassandra.simulator.systems.InterceptibleThread.runDeterministic;
import static org.apache.cassandra.simulator.systems.SimulatedAction.Kind.SCHEDULED_DAEMON;
import static org.apache.cassandra.simulator.systems.SimulatedAction.Kind.SCHEDULED_TASK;
import static org.apache.cassandra.simulator.systems.SimulatedAction.Kind.SCHEDULED_TIMEOUT;
import static org.apache.cassandra.simulator.systems.SimulatedExecution.callable;
import static org.apache.cassandra.simulator.systems.SimulatedTime.Global.localToRelativeNanos;
import static org.apache.cassandra.simulator.systems.SimulatedTime.Global.localToGlobalNanos;
import static org.apache.cassandra.simulator.systems.SimulatedTime.Global.relativeToGlobalNanos;
import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
// An executor whose tasks we can intercept the execution of
@Shared(scope = SIMULATION, inner = INTERFACES)
public interface InterceptingExecutor extends OrderOn
{
class ForbiddenExecutionException extends RejectedExecutionException
{
}
interface InterceptingTaskFactory extends TaskFactory
{
}
void addPending(Object task);
void cancelPending(Object task);
void submitUnmanaged(Runnable task);
void submitAndAwaitPause(Runnable task, InterceptorOfConsequences interceptor);
OrderOn orderAppliesAfterScheduling();
static class InterceptedScheduledFutureTask<T> extends SyncFutureTask<T> implements ScheduledFuture<T>
{
final long delayNanos;
Runnable onCancel;
public InterceptedScheduledFutureTask(long delayNanos, Callable<T> call)
{
super(call);
this.delayNanos = delayNanos;
}
@Override
public long getDelay(TimeUnit unit)
{
return unit.convert(delayNanos, NANOSECONDS);
}
@Override
public int compareTo(Delayed that)
{
return Long.compare(delayNanos, that.getDelay(NANOSECONDS));
}
void onCancel(Runnable onCancel)
{
this.onCancel = onCancel;
}
@Override
public boolean cancel(boolean b)
{
if (onCancel != null)
{
onCancel.run();
onCancel = null;
}
return super.cancel(b);
}
}
@PerClassLoader
abstract class AbstractInterceptingExecutor implements InterceptingExecutor, ExecutorPlus
{
private static final AtomicIntegerFieldUpdater<AbstractInterceptingExecutor> pendingUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractInterceptingExecutor.class, "pending");
final OrderAppliesAfterScheduling orderAppliesAfterScheduling = new OrderAppliesAfterScheduling(this);
final InterceptorOfExecution interceptorOfExecution;
final InterceptingTaskFactory taskFactory;
final Set<Object> debugPending = TEST_SIMULATOR_DEBUG.getBoolean() ? synchronizedSet(newSetFromMap(new IdentityHashMap<>())) : null;
final Condition isTerminated;
volatile boolean isShutdown;
volatile int pending;
protected AbstractInterceptingExecutor(InterceptorOfExecution interceptorOfExecution, InterceptingTaskFactory taskFactory)
{
this.interceptorOfExecution = interceptorOfExecution;
this.isTerminated = new InterceptingCondition();
this.taskFactory = taskFactory;
}
@Override
public void addPending(Object task)
{
if (isShutdown)
throw new RejectedExecutionException();
pendingUpdater.incrementAndGet(this);
if (isShutdown)
{
if (0 == pendingUpdater.decrementAndGet(this))
terminate();
throw new RejectedExecutionException();
}
if (debugPending != null && !debugPending.add(task))
throw new AssertionError();
}
@Override
public void cancelPending(Object task)
{
boolean shutdown = isShutdown;
if (completePending(task) == 0 && shutdown)
terminate();
}
@Override
public OrderOn orderAppliesAfterScheduling()
{
return orderAppliesAfterScheduling;
}
public int completePending(Object task)
{
int remaining = pendingUpdater.decrementAndGet(this);
if (debugPending != null && !debugPending.remove(task))
throw new AssertionError();
return remaining;
}
<V, T extends RunnableFuture<V>> T addTask(T task)
{
if (isShutdown)
throw new RejectedExecutionException();
return interceptorOfExecution.intercept().addTask(task, this);
}
public void maybeExecuteImmediately(Runnable command)
{
execute(command);
}
@Override
public void execute(Runnable run)
{
addTask(taskFactory.toSubmit(run));
}
@Override
public void execute(WithResources withResources, Runnable run)
{
addTask(taskFactory.toSubmit(withResources, run));
}
@Override
public Future<?> submit(Runnable run)
{
return addTask(taskFactory.toSubmit(run));
}
@Override
public <T> Future<T> submit(Runnable run, T result)
{
return addTask(taskFactory.toSubmit(run, result));
}
@Override
public <T> Future<T> submit(Callable<T> call)
{
return addTask(taskFactory.toSubmit(call));
}
@Override
public <T> Future<T> submit(WithResources withResources, Runnable run, T result)
{
return addTask(taskFactory.toSubmit(withResources, run, result));
}
@Override
public Future<?> submit(WithResources withResources, Runnable run)
{
return addTask(taskFactory.toSubmit(withResources, run));
}
@Override
public <T> Future<T> submit(WithResources withResources, Callable<T> call)
{
return addTask(taskFactory.toSubmit(withResources, call));
}
abstract void terminate();
public boolean isShutdown()
{
return isShutdown;
}
public boolean isTerminated()
{
return isTerminated.isSignalled();
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
Thread thread = Thread.currentThread();
if (thread instanceof InterceptibleThread)
{
InterceptibleThread interceptibleThread = (InterceptibleThread) thread;
if (interceptibleThread.isIntercepting())
{
// simpler to use no timeout than to ensure pending tasks all run first in simulation
isTerminated.await();
return true;
}
}
return isTerminated.await(timeout, unit);
}
@Override
public void setCorePoolSize(int newCorePoolSize)
{
throw new UnsupportedOperationException();
}
@Override
public void setMaximumPoolSize(int newMaximumPoolSize)
{
throw new UnsupportedOperationException();
}
}
@PerClassLoader
class InterceptingPooledExecutor extends AbstractInterceptingExecutor implements InterceptingExecutor
{
enum State { RUNNING, TERMINATING, TERMINATED }
private class WaitingThread
{
final InterceptibleThread thread;
Runnable task;
State state = State.RUNNING;
WaitingThread(ThreadFactory factory)
{
this.thread = (InterceptibleThread) factory.newThread(() -> {
InterceptibleThread thread = (InterceptibleThread) Thread.currentThread();
try
{
while (true)
{
try
{
task.run();
}
catch (Throwable t)
{
try { thread.getUncaughtExceptionHandler().uncaughtException(thread, t); }
catch (Throwable ignore) {}
}
boolean shutdown = isShutdown;
int remaining = completePending(task);
if (shutdown && remaining < threads.size())
{
threads.remove(thread);
thread.onTermination();
if (threads.isEmpty())
isTerminated.signal(); // this has simulator side-effects, so try to perform before we interceptTermination
thread.interceptTermination(true);
return;
}
task = null;
waiting.add(this); // inverse order of waiting.add/isShutdown to ensure visibility vs shutdown()
thread.interceptTermination(false);
synchronized (this)
{
while (task == null)
{
if (state == State.TERMINATING)
return;
try { wait(); }
catch (InterruptedException | UncheckedInterruptedException ignore) { }
}
}
}
}
finally
{
try
{
runDeterministic(() -> {
if (null != threads.remove(thread))
{
task = null;
waiting.remove(this);
thread.onTermination();
if (isShutdown && threads.isEmpty() && waiting.isEmpty() && !isTerminated())
isTerminated.signal();
}
});
}
finally
{
synchronized (this)
{
state = State.TERMINATED;
notify();
}
}
}
});
threads.put(thread, this);
}
synchronized void submit(Runnable task)
{
if (state != State.RUNNING)
throw new IllegalStateException();
this.task = task;
if (thread.isAlive()) notify();
else thread.start();
}
synchronized void terminate()
{
if (state != State.TERMINATED)
state = State.TERMINATING;
if (thread.isAlive()) notify();
else thread.start();
try
{
while (state != State.TERMINATED)
wait();
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException(e);
}
}
}
final Map<InterceptibleThread, WaitingThread> threads = synchronizedMap(new IdentityHashMap<>());
final ThreadFactory threadFactory;
final Queue<WaitingThread> waiting = new ConcurrentLinkedQueue<>();
final int concurrency;
InterceptingPooledExecutor(InterceptorOfExecution interceptorOfExecution, int concurrency, ThreadFactory threadFactory, InterceptingTaskFactory taskFactory)
{
super(interceptorOfExecution, taskFactory);
this.threadFactory = threadFactory;
this.concurrency = concurrency;
}
public void submitAndAwaitPause(Runnable task, InterceptorOfConsequences interceptor)
{
// we don't check isShutdown as we could have a task queued by simulation from prior to shutdown
if (isTerminated()) throw new AssertionError();
if (debugPending != null && !debugPending.contains(task)) throw new AssertionError();
WaitingThread waiting = getWaiting();
AwaitPaused done = new AwaitPaused(waiting);
waiting.thread.beforeInvocation(interceptor, done);
synchronized (waiting)
{
waiting.submit(task);
done.awaitPause();
}
}
public void submitUnmanaged(Runnable task)
{
if (isShutdown)
throw new RejectedExecutionException();
addPending(task);
WaitingThread waiting = getWaiting();
waiting.submit(task);
}
private WaitingThread getWaiting()
{
WaitingThread next = waiting.poll();
if (next != null)
return next;
return new WaitingThread(threadFactory);
}
public synchronized void shutdown()
{
isShutdown = true;
WaitingThread next;
while (null != (next = waiting.poll()))
next.terminate();
if (pending == 0)
terminate();
}
synchronized void terminate()
{
List<InterceptibleThread> snapshot = new ArrayList<>(threads.keySet());
for (InterceptibleThread thread : snapshot)
{
WaitingThread terminate = threads.get(thread);
if (terminate != null)
terminate.terminate();
}
runDeterministic(isTerminated::signal);
}
public synchronized List<Runnable> shutdownNow()
{
shutdown();
threads.keySet().forEach(InterceptibleThread::interrupt);
return Collections.emptyList();
}
@Override
public boolean inExecutor()
{
return threads.containsKey(Thread.currentThread());
}
@Override
public int getActiveTaskCount()
{
return threads.size() - waiting.size();
}
@Override
public long getCompletedTaskCount()
{
return 0;
}
@Override
public int getPendingTaskCount()
{
return 0;
}
@Override
public int getCorePoolSize()
{
return concurrency;
}
@Override
public int getMaximumPoolSize()
{
return concurrency;
}
public String toString()
{
return threadFactory.toString();
}
@Override
public int concurrency()
{
return concurrency;
}
}
// we might want different variants
// (did consider a non-intercepting variant, or immediate executor, but we need to intercept the thread events)
@PerClassLoader
abstract class AbstractSingleThreadedExecutorPlus extends AbstractInterceptingExecutor implements SequentialExecutorPlus
{
static class AtLeastOnce extends SingleThreadExecutorPlus.AtLeastOnce
{
AtLeastOnce(SequentialExecutorPlus executor, Runnable run)
{
super(executor, run);
}
public boolean trigger()
{
boolean success;
if (success = compareAndSet(false, true))
executor.execute(this);
else
executor.execute(() -> {}); // submit a no-op, so we can still impose our causality orderings
return success;
}
}
final InterceptibleThread thread;
final ArrayDeque<Runnable> queue = new ArrayDeque<>();
volatile boolean executing, terminating, terminated;
AbstractSingleThreadedExecutorPlus(InterceptorOfExecution interceptorOfExecution, ThreadFactory threadFactory, InterceptingTaskFactory taskFactory)
{
super(interceptorOfExecution, taskFactory);
this.thread = (InterceptibleThread) threadFactory.newThread(() -> {
InterceptibleThread thread = (InterceptibleThread) Thread.currentThread();
try
{
while (true)
{
Runnable task;
try
{
task = dequeue();
}
catch (InterruptedException | UncheckedInterruptedException ignore)
{
if (terminating) return;
else continue;
}
try
{
task.run();
}
catch (Throwable t)
{
try { thread.getUncaughtExceptionHandler().uncaughtException(thread, t); }
catch (Throwable ignore) {}
}
executing = false;
boolean shutdown = isShutdown;
if ((0 == completePending(task) && shutdown))
return;
thread.interceptTermination(false);
}
}
finally
{
runDeterministic(thread::onTermination);
if (terminating)
{
synchronized (this)
{
terminated = true;
notifyAll();
}
}
else
{
runDeterministic(this::terminate);
}
}
});
thread.start();
}
void terminate()
{
synchronized (this)
{
assert pending == 0;
if (terminating)
return;
terminating = true;
if (Thread.currentThread() != thread)
{
notifyAll();
try { while (!terminated) wait(); }
catch (InterruptedException e) { throw new UncheckedInterruptedException(e); }
}
terminated = true;
}
isTerminated.signal(); // this has simulator side-effects, so try to perform before we interceptTermination
if (Thread.currentThread() == thread && thread.isIntercepting())
thread.interceptTermination(true);
}
public synchronized void shutdown()
{
if (isShutdown)
return;
isShutdown = true;
if (pending == 0)
terminate();
}
public synchronized List<Runnable> shutdownNow()
{
if (isShutdown)
return Collections.emptyList();
isShutdown = true;
List<Runnable> cancelled = new ArrayList<>(queue);
queue.clear();
cancelled.forEach(super::cancelPending);
if (pending == 0) terminate();
else thread.interrupt();
return cancelled;
}
synchronized void enqueue(Runnable runnable)
{
queue.add(runnable);
notify();
}
synchronized Runnable dequeue() throws InterruptedException
{
Runnable next;
while (null == (next = queue.poll()) && !terminating)
wait();
if (next == null)
throw new InterruptedException();
return next;
}
public AtLeastOnce atLeastOnceTrigger(Runnable run)
{
return new AtLeastOnce(this, run);
}
@Override
public boolean inExecutor()
{
return thread == Thread.currentThread();
}
@Override
public int getCorePoolSize()
{
return 1;
}
@Override
public int getMaximumPoolSize()
{
return 1;
}
public String toString()
{
return thread.toString();
}
}
@PerClassLoader
class InterceptingSequentialExecutor extends AbstractSingleThreadedExecutorPlus implements InterceptingExecutor, ScheduledExecutorPlus, OrderOn
{
InterceptingSequentialExecutor(InterceptorOfExecution interceptorOfExecution, ThreadFactory threadFactory, InterceptingTaskFactory taskFactory)
{
super(interceptorOfExecution, threadFactory, taskFactory);
}
public void submitAndAwaitPause(Runnable task, InterceptorOfConsequences interceptor)
{
synchronized (this)
{
// we don't check isShutdown as we could have a task queued by simulation from prior to shutdown
if (terminated) throw new AssertionError();
if (executing) throw new AssertionError();
if (debugPending != null && !debugPending.contains(task)) throw new AssertionError();
executing = true;
AwaitPaused done = new AwaitPaused(this);
thread.beforeInvocation(interceptor, done);
enqueue(task);
done.awaitPause();
}
}
public synchronized void submitUnmanaged(Runnable task)
{
addPending(task);
enqueue(task);
}
@Override public int getActiveTaskCount()
{
return !queue.isEmpty() || executing ? 1 : 0;
}
@Override public long getCompletedTaskCount()
{
return 0;
}
@Override
public int getPendingTaskCount()
{
return 0;
}
public ScheduledFuture<?> schedule(Runnable run, long delay, TimeUnit unit)
{
if (isShutdown)
throw new RejectedExecutionException();
long delayNanos = unit.toNanos(delay);
return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, delayNanos, relativeToGlobalNanos(delayNanos), callable(run, null), this);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
{
if (isShutdown)
throw new RejectedExecutionException();
long delayNanos = unit.toNanos(delay);
return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, delayNanos, relativeToGlobalNanos(delayNanos), callable, this);
}
public ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, TimeUnit unit)
{
return scheduleTimeoutAt(run, relativeToGlobalNanos(unit.toNanos(delay)));
}
public ScheduledFuture<?> scheduleAt(Runnable run, long deadlineNanos)
{
if (isShutdown)
throw new RejectedExecutionException();
return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), callable(run, null), this);
}
public ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadlineNanos)
{
if (isShutdown)
throw new RejectedExecutionException();
return interceptorOfExecution.intercept().schedule(SCHEDULED_TIMEOUT, localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), callable(run, null), this);
}
public ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, TimeUnit unit)
{
if (isShutdown)
throw new RejectedExecutionException();
long delayNanos = unit.toNanos(delay);
return interceptorOfExecution.intercept().schedule(SCHEDULED_DAEMON, delayNanos, relativeToGlobalNanos(delayNanos), callable(run, null), this);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable run, long initialDelay, long period, TimeUnit unit)
{
if (isShutdown)
throw new RejectedExecutionException();
long delayNanos = unit.toNanos(initialDelay);
return interceptorOfExecution.intercept().schedule(SCHEDULED_DAEMON, delayNanos, relativeToGlobalNanos(delayNanos), new Callable<Object>()
{
@Override
public Object call()
{
run.run();
if (!isShutdown)
scheduleAtFixedRate(run, period, period, unit);
return null;
}
@Override
public String toString()
{
return run.toString();
}
}, this);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable run, long initialDelay, long delay, TimeUnit unit)
{
return scheduleAtFixedRate(run, initialDelay, delay, unit);
}
public int concurrency()
{
return 1;
}
}
@PerClassLoader
class InterceptingPooledLocalAwareExecutor extends InterceptingPooledExecutor implements LocalAwareExecutorPlus
{
InterceptingPooledLocalAwareExecutor(InterceptorOfExecution interceptors, int concurrency, ThreadFactory threadFactory, InterceptingTaskFactory taskFactory)
{
super(interceptors, concurrency, threadFactory, taskFactory);
}
}
@PerClassLoader
class InterceptingLocalAwareSequentialExecutor extends InterceptingSequentialExecutor implements LocalAwareSequentialExecutorPlus
{
InterceptingLocalAwareSequentialExecutor(InterceptorOfExecution interceptorOfExecution, ThreadFactory threadFactory, InterceptingTaskFactory taskFactory)
{
super(interceptorOfExecution, threadFactory, taskFactory);
}
}
@PerClassLoader
static class DiscardingSequentialExecutor implements LocalAwareSequentialExecutorPlus, ScheduledExecutorPlus
{
@Override
public void shutdown()
{
}
@Override
public List<Runnable> shutdownNow()
{
return Collections.emptyList();
}
@Override
public boolean isShutdown()
{
return false;
}
@Override
public boolean isTerminated()
{
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
return false;
}
@Override
public <T> Future<T> submit(Callable<T> task)
{
return ImmediateFuture.cancelled();
}
@Override
public <T> Future<T> submit(Runnable task, T result)
{
return ImmediateFuture.cancelled();
}
@Override
public Future<?> submit(Runnable task)
{
return ImmediateFuture.cancelled();
}
@Override
public void execute(WithResources withResources, Runnable task)
{
}
@Override
public <T> Future<T> submit(WithResources withResources, Callable<T> task)
{
return ImmediateFuture.cancelled();
}
@Override
public Future<?> submit(WithResources withResources, Runnable task)
{
return ImmediateFuture.cancelled();
}
@Override
public <T> Future<T> submit(WithResources withResources, Runnable task, T result)
{
return ImmediateFuture.cancelled();
}
@Override
public boolean inExecutor()
{
return false;
}
@Override
public int getCorePoolSize()
{
return 0;
}
@Override
public void setCorePoolSize(int newCorePoolSize)
{
}
@Override
public int getMaximumPoolSize()
{
return 0;
}
@Override
public void setMaximumPoolSize(int newMaximumPoolSize)
{
}
@Override
public int getActiveTaskCount()
{
return 0;
}
@Override
public long getCompletedTaskCount()
{
return 0;
}
@Override
public int getPendingTaskCount()
{
return 0;
}
@Override
public AtLeastOnceTrigger atLeastOnceTrigger(Runnable runnable)
{
return new AtLeastOnceTrigger()
{
@Override
public boolean trigger()
{
return false;
}
@Override
public void runAfter(Runnable run)
{
}
@Override
public void sync()
{
}
};
}
@Override
public void execute(Runnable command)
{
}
@Override
public ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, TimeUnit units)
{
return new NotScheduledFuture<>();
}
@Override
public ScheduledFuture<?> scheduleAt(Runnable run, long deadline)
{
return new NotScheduledFuture<>();
}
@Override
public ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadline)
{
return new NotScheduledFuture<>();
}
@Override
public ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, TimeUnit units)
{
return new NotScheduledFuture<>();
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
{
return new NotScheduledFuture<>();
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
{
return new NotScheduledFuture<>();
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
{
return new NotScheduledFuture<>();
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
{
return new NotScheduledFuture<>();
}
}
}