blob: 0cb26bf0e5a5a6e13ea05f37e49a691b9a8ee6d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.systems;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.simulator.OrderOn;
import org.apache.cassandra.simulator.systems.InterceptedWait.Trigger;
import org.apache.cassandra.simulator.systems.SimulatedTime.LocalTime;
import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static org.apache.cassandra.simulator.systems.InterceptedWait.Kind.UNBOUNDED_WAIT;
import static org.apache.cassandra.simulator.systems.InterceptedWait.Kind.WAIT_UNTIL;
import static org.apache.cassandra.simulator.systems.InterceptedWait.Trigger.INTERRUPT;
import static org.apache.cassandra.simulator.systems.InterceptedWait.Trigger.SIGNAL;
import static org.apache.cassandra.simulator.systems.InterceptibleThread.WaitTimeKind.ABSOLUTE_MILLIS;
import static org.apache.cassandra.simulator.systems.InterceptibleThread.WaitTimeKind.NONE;
import static org.apache.cassandra.simulator.systems.InterceptibleThread.WaitTimeKind.RELATIVE_NANOS;
import static org.apache.cassandra.utils.Shared.Recursive.ALL;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@Shared(scope = SIMULATION, ancestors = ALL, members = ALL)
public class InterceptibleThread extends FastThreadLocalThread implements InterceptorOfConsequences
{
@Shared(scope = SIMULATION)
enum WaitTimeKind
{
NONE, RELATIVE_NANOS, ABSOLUTE_MILLIS
}
// used to implement LockSupport.park/unpark
@Shared(scope = SIMULATION)
private class Parked implements InterceptedWait
{
final Kind kind;
final CaptureSites captureSites;
final InterceptorOfConsequences waitInterceptedBy;
final List<TriggerListener> onTrigger = new ArrayList<>(3);
final long waitTime;
boolean isDone; // we have been signalled (by the simulation or otherwise)
Trigger trigger;
Parked(Kind kind, CaptureSites captureSites, long waitTime, InterceptorOfConsequences waitInterceptedBy)
{
this.kind = kind;
this.captureSites = captureSites;
this.waitTime = waitTime;
this.waitInterceptedBy = waitInterceptedBy;
}
@Override
public Kind kind()
{
return kind;
}
@Override
public long waitTime()
{
return waitTime;
}
@Override
public boolean isTriggered()
{
return parked != this;
}
@Override
public boolean isInterruptible()
{
return true;
}
@Override
public synchronized void triggerAndAwaitDone(InterceptorOfConsequences interceptor, Trigger trigger)
{
if (parked == null)
return;
beforeInvocation(interceptor, this);
parked = null;
onTrigger.forEach(listener -> listener.onTrigger(this));
if (!preWakeup(this))
notify();
try
{
while (!isDone)
wait();
}
catch (InterruptedException ie)
{
throw new UncheckedInterruptedException(ie);
}
}
@Override
public synchronized void triggerBypass()
{
parked = null;
notifyAll();
LockSupport.unpark(InterceptibleThread.this);
}
@Override
public void addListener(TriggerListener onTrigger)
{
this.onTrigger.add(onTrigger);
}
@Override
public Thread waiting()
{
return InterceptibleThread.this;
}
@Override
public synchronized void notifyThreadPaused()
{
isDone = true;
notifyAll();
}
synchronized void await()
{
try
{
while (!isTriggered())
wait();
if (hasPendingInterrupt)
doInterrupt();
hasPendingInterrupt = false;
}
catch (InterruptedException e)
{
if (!isTriggered()) throw new UncheckedInterruptedException(e);
else doInterrupt();
}
}
@Override
public void interceptWakeup(Trigger trigger, Thread by)
{
if (this.trigger != null && this.trigger.compareTo(trigger) >= 0)
return;
this.trigger = trigger;
if (captureSites != null)
captureSites.registerWakeup(by);
interceptorOrDefault(by).interceptWakeup(this, trigger, waitInterceptedBy);
}
@Override
public String toString()
{
return captureSites == null ? "" : captureSites.toString();
}
}
private static InterceptorOfConsequences debug;
final Object extraToStringInfo; // optional dynamic extra information for reporting with toString
final String toString;
final Runnable onTermination;
private final InterceptorOfGlobalMethods interceptorOfGlobalMethods;
private final LocalTime time;
// this is set before the thread's execution begins/continues; events and cessation are reported back to this
private InterceptorOfConsequences interceptor;
private NotifyThreadPaused notifyOnPause;
private boolean hasPendingUnpark;
private boolean hasPendingInterrupt;
private Parked parked;
private InterceptedWait waitingOn;
volatile boolean trapInterrupts = true;
// we need to avoid non-determinism when evaluating things in the debugger and toString() is the main culprit
// so we re-write toString methods to update this counter, so that while we are evaluating a toString we do not
// perform any non-deterministic actions
private int determinismDepth;
public InterceptibleThread(ThreadGroup group, Runnable target, String name, Object extraToStringInfo, Runnable onTermination, InterceptorOfGlobalMethods interceptorOfGlobalMethods, LocalTime time)
{
super(group, target, name);
this.onTermination = onTermination;
this.interceptorOfGlobalMethods = interceptorOfGlobalMethods;
this.time = time;
// group is nulled on termination, and we need it for reporting purposes, so save the toString
this.toString = "Thread[" + name + ',' + getPriority() + ',' + group.getName() + ']';
this.extraToStringInfo = extraToStringInfo;
}
public boolean park(long waitTime, WaitTimeKind waitTimeKind)
{
if (interceptor == null) return false;
if (hasPendingUnpark) hasPendingUnpark = false;
else if (!isInterrupted())
{
InterceptedWait.Kind kind;
switch (waitTimeKind)
{
default:
throw new AssertionError();
case NONE:
kind = UNBOUNDED_WAIT;
break;
case RELATIVE_NANOS:
kind = WAIT_UNTIL;
waitTime = time.localToGlobalNanos(time.relativeToLocalNanos(waitTime));
break;
case ABSOLUTE_MILLIS:
kind = WAIT_UNTIL;
waitTime = time.translate().fromMillisSinceEpoch(waitTime);
}
Parked parked = new Parked(kind, interceptorOfGlobalMethods.captureWaitSite(this), waitTime, interceptor);
this.parked = parked;
interceptWait(parked);
parked.await();
}
return true;
}
public boolean unpark(InterceptibleThread by)
{
if (by.interceptor == null) return false;
if (parked == null) hasPendingUnpark = true;
else parked.interceptWakeup(SIGNAL, by);
return true;
}
public void trapInterrupts(boolean trapInterrupts)
{
this.trapInterrupts = trapInterrupts;
if (!trapInterrupts && hasPendingInterrupt)
doInterrupt();
}
public boolean hasPendingInterrupt()
{
return hasPendingInterrupt;
}
public boolean preWakeup(InterceptedWait wakingOn)
{
assert wakingOn == waitingOn;
waitingOn = null;
if (!hasPendingInterrupt)
return false;
hasPendingInterrupt = false;
doInterrupt();
return true;
}
public void doInterrupt()
{
super.interrupt();
}
@Override
public void interrupt()
{
Thread by = Thread.currentThread();
if (by == this || !(by instanceof InterceptibleThread) || !trapInterrupts) doInterrupt();
else
{
hasPendingInterrupt = true;
if (waitingOn != null && waitingOn.isInterruptible())
waitingOn.interceptWakeup(INTERRUPT, by);
}
}
@Override
public void beforeInvocation(InterceptibleThread realThread)
{
throw new UnsupportedOperationException();
}
@Override
public void interceptMessage(IInvokableInstance from, IInvokableInstance to, IMessage message)
{
++determinismDepth;
try
{
if (interceptor == null) to.receiveMessage(message);
else interceptor.interceptMessage(from, to, message);
if (debug != null) debug.interceptMessage(from, to, message);
}
finally
{
--determinismDepth;
}
}
@Override
public void interceptWakeup(InterceptedWait wakeup, Trigger trigger, InterceptorOfConsequences waitWasInterceptedBy)
{
++determinismDepth;
try
{
interceptor.interceptWakeup(wakeup, trigger, waitWasInterceptedBy);
if (debug != null) debug.interceptWakeup(wakeup, trigger, waitWasInterceptedBy);
}
finally
{
--determinismDepth;
}
}
@Override
public void interceptExecution(InterceptedExecution invoke, OrderOn orderOn)
{
++determinismDepth;
try
{
interceptor.interceptExecution(invoke, orderOn);
if (debug != null) debug.interceptExecution(invoke, orderOn);
}
finally
{
--determinismDepth;
}
}
@Override
public void interceptWait(InterceptedWait wakeupWith)
{
++determinismDepth;
try
{
if (interceptor == null)
return;
InterceptorOfConsequences interceptor = this.interceptor;
NotifyThreadPaused notifyOnPause = this.notifyOnPause;
this.interceptor = null;
this.notifyOnPause = null;
this.waitingOn = wakeupWith;
interceptor.interceptWait(wakeupWith);
if (debug != null) debug.interceptWait(wakeupWith);
notifyOnPause.notifyThreadPaused();
}
finally
{
--determinismDepth;
}
}
public void onTermination()
{
onTermination.run();
}
@Override
public void interceptTermination(boolean isThreadTermination)
{
++determinismDepth;
try
{
if (interceptor == null)
return;
InterceptorOfConsequences interceptor = this.interceptor;
NotifyThreadPaused notifyOnPause = this.notifyOnPause;
this.interceptor = null;
this.notifyOnPause = null;
interceptor.interceptTermination(isThreadTermination);
if (debug != null) debug.interceptTermination(isThreadTermination);
notifyOnPause.notifyThreadPaused();
}
finally
{
--determinismDepth;
}
}
public void beforeInvocation(InterceptorOfConsequences interceptor, NotifyThreadPaused notifyOnPause)
{
assert this.interceptor == null;
assert this.notifyOnPause == null;
this.interceptor = interceptor;
this.notifyOnPause = notifyOnPause;
interceptor.beforeInvocation(this);
}
public boolean isEvaluationDeterministic()
{
return determinismDepth > 0;
}
public boolean isIntercepting()
{
return interceptor != null;
}
public InterceptorOfConsequences interceptedBy()
{
return interceptor;
}
public InterceptorOfGlobalMethods interceptorOfGlobalMethods()
{
return interceptorOfGlobalMethods;
}
public int hashCode()
{
return toString.hashCode();
}
public String toString()
{
return extraToStringInfo == null ? toString : toString + ' ' + extraToStringInfo;
}
public static boolean isDeterministic()
{
Thread thread = Thread.currentThread();
return thread instanceof InterceptibleThread && ((InterceptibleThread) thread).determinismDepth > 0;
}
public static void runDeterministic(Runnable runnable)
{
enterDeterministicMethod();
try
{
runnable.run();
}
finally
{
exitDeterministicMethod();
}
}
public static void enterDeterministicMethod()
{
Thread anyThread = Thread.currentThread();
if (!(anyThread instanceof InterceptibleThread))
return;
InterceptibleThread thread = (InterceptibleThread) anyThread;
++thread.determinismDepth;
}
public static void exitDeterministicMethod()
{
Thread anyThread = Thread.currentThread();
if (!(anyThread instanceof InterceptibleThread))
return;
InterceptibleThread thread = (InterceptibleThread) anyThread;
--thread.determinismDepth;
}
public static void park()
{
Thread thread = Thread.currentThread();
if (!(thread instanceof InterceptibleThread) || !((InterceptibleThread) thread).park(-1, NONE))
LockSupport.park();
}
public static void parkNanos(long nanos)
{
Thread thread = Thread.currentThread();
if (!(thread instanceof InterceptibleThread) || !((InterceptibleThread) thread).park(nanos, RELATIVE_NANOS))
LockSupport.parkNanos(nanos);
}
public static void parkUntil(long millis)
{
Thread thread = Thread.currentThread();
if (!(thread instanceof InterceptibleThread) || !((InterceptibleThread) thread).park(millis, ABSOLUTE_MILLIS))
LockSupport.parkUntil(millis);
}
public static void park(Object blocker)
{
Thread thread = Thread.currentThread();
if (!(thread instanceof InterceptibleThread) || !((InterceptibleThread) thread).park(-1, NONE))
LockSupport.park(blocker);
}
public static void parkNanos(Object blocker, long relative)
{
Thread thread = Thread.currentThread();
if (!(thread instanceof InterceptibleThread) || !((InterceptibleThread) thread).park(relative, RELATIVE_NANOS))
LockSupport.parkNanos(blocker, relative);
}
public static void parkUntil(Object blocker, long millis)
{
Thread thread = Thread.currentThread();
if (!(thread instanceof InterceptibleThread) || !((InterceptibleThread) thread).park(millis, ABSOLUTE_MILLIS))
LockSupport.parkUntil(blocker, millis);
}
public static void unpark(Thread thread)
{
Thread currentThread = Thread.currentThread();
if (!(thread instanceof InterceptibleThread) || !(currentThread instanceof InterceptibleThread)
|| !((InterceptibleThread) thread).unpark((InterceptibleThread) currentThread))
LockSupport.unpark(thread);
}
public static InterceptorOfConsequences interceptorOrDefault(Thread thread)
{
if (!(thread instanceof InterceptibleThread))
return DEFAULT_INTERCEPTOR;
return interceptorOrDefault((InterceptibleThread) thread);
}
public static InterceptorOfConsequences interceptorOrDefault(InterceptibleThread thread)
{
return thread.isIntercepting() ? thread : DEFAULT_INTERCEPTOR;
}
public LocalTime time()
{
return time;
}
public static void setDebugInterceptor(InterceptorOfConsequences interceptor)
{
debug = interceptor;
}
}