blob: ef4e24da079c209de1dadb199a002b3eb9842045 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.systems;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.cassandra.simulator.systems.InterceptedWait.InterceptedConditionWait;
import org.apache.cassandra.simulator.systems.InterceptedWait.TriggerListener;
import org.apache.cassandra.utils.concurrent.Awaitable;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static org.apache.cassandra.simulator.systems.InterceptedWait.Kind.WAIT_UNTIL;
import static org.apache.cassandra.simulator.systems.InterceptedWait.Kind.UNBOUNDED_WAIT;
import static org.apache.cassandra.simulator.systems.InterceptedWait.Trigger.SIGNAL;
import static org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods.Global.captureWaitSite;
import static org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods.Global.ifIntercepted;
import static org.apache.cassandra.simulator.systems.SimulatedTime.Global.localToGlobalNanos;
import static org.apache.cassandra.simulator.systems.SimulatedTime.Global.relativeToLocalNanos;
@PerClassLoader
abstract class InterceptingAwaitable implements Awaitable
{
abstract boolean isSignalled();
abstract Condition maybeIntercept(InterceptedWait.Kind kind, long waitNanos);
Condition maybeInterceptThrowChecked(InterceptedWait.Kind kind, long waitNanos) throws InterruptedException
{
if (Thread.interrupted())
throw new InterruptedException();
return maybeIntercept(kind, waitNanos);
}
Condition maybeInterceptThrowUnchecked(InterceptedWait.Kind kind, long waitNanos)
{
if (Thread.interrupted())
throw new UncheckedInterruptedException();
return maybeIntercept(kind, waitNanos);
}
public boolean awaitUntil(long deadline) throws InterruptedException
{
maybeInterceptThrowChecked(WAIT_UNTIL, deadline).awaitUntil(deadline);
return isSignalled();
}
public boolean awaitUntilThrowUncheckedOnInterrupt(long deadline)
{
maybeInterceptThrowUnchecked(WAIT_UNTIL, deadline).awaitUntilThrowUncheckedOnInterrupt(deadline);
return isSignalled();
}
public boolean awaitUntilUninterruptibly(long deadline)
{
maybeIntercept(WAIT_UNTIL, deadline).awaitUntilUninterruptibly(deadline);
return isSignalled();
}
public Awaitable await() throws InterruptedException
{
maybeInterceptThrowChecked(UNBOUNDED_WAIT, 0).await();
return this;
}
public Awaitable awaitThrowUncheckedOnInterrupt()
{
maybeInterceptThrowUnchecked(UNBOUNDED_WAIT, 0).awaitThrowUncheckedOnInterrupt();
return this;
}
public Awaitable awaitUninterruptibly()
{
maybeIntercept(UNBOUNDED_WAIT, 0).awaitUninterruptibly();
return this;
}
public boolean await(long time, TimeUnit units) throws InterruptedException
{
long deadline = relativeToLocalNanos(units.toNanos(time));
maybeInterceptThrowChecked(WAIT_UNTIL, localToGlobalNanos(deadline)).awaitUntil(deadline);
return isSignalled();
}
public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units)
{
long deadline = relativeToLocalNanos(units.toNanos(time));
maybeInterceptThrowUnchecked(WAIT_UNTIL, localToGlobalNanos(deadline)).awaitUntilThrowUncheckedOnInterrupt(deadline);
return isSignalled();
}
public boolean awaitUninterruptibly(long time, TimeUnit units)
{
long deadline = relativeToLocalNanos(units.toNanos(time));
maybeIntercept(WAIT_UNTIL, localToGlobalNanos(deadline)).awaitUntilUninterruptibly(deadline);
return isSignalled();
}
@PerClassLoader
static class InterceptingCondition extends InterceptingAwaitable implements Condition, TriggerListener
{
final Condition inner = new NotInterceptedSyncCondition();
private List<InterceptedConditionWait> intercepted;
public InterceptingCondition()
{
}
Condition maybeIntercept(InterceptedWait.Kind kind, long waitNanos)
{
if (inner.isSignalled())
return inner;
InterceptibleThread thread = ifIntercepted();
if (thread == null)
return inner;
InterceptedConditionWait signal = new InterceptedConditionWait(kind, waitNanos, thread, captureWaitSite(thread), inner);
synchronized (this)
{
if (intercepted == null)
intercepted = new ArrayList<>(2);
intercepted.add(signal);
}
signal.addListener(this);
thread.interceptWait(signal);
return signal;
}
public boolean isSignalled()
{
return inner.isSignalled();
}
public void signal()
{
if (isSignalled())
return;
inner.signal();
synchronized (this)
{
if (intercepted != null)
{
Thread signalledBy = Thread.currentThread();
intercepted.forEach(signal -> signal.interceptWakeup(SIGNAL, signalledBy));
}
}
}
@Override
public synchronized void onTrigger(InterceptedWait triggered)
{
intercepted.remove(triggered);
}
}
@PerClassLoader
static class InterceptingCountDownLatch extends InterceptingCondition implements CountDownLatch
{
private final AtomicInteger count;
public InterceptingCountDownLatch(int count)
{
super();
this.count = new AtomicInteger(count);
}
public void decrement()
{
if (count.decrementAndGet() == 0)
signal();
}
public int count()
{
return count.get();
}
}
@PerClassLoader
static class InterceptingSignal<V> extends InterceptingAwaitable implements WaitQueue.Signal
{
final Condition inner = new NotInterceptedSyncCondition();
final V supplyOnDone;
final Consumer<V> receiveOnDone;
InterceptedConditionWait intercepted;
boolean isSignalled;
boolean isCancelled;
InterceptingSignal()
{
this(null, ignore -> {});
}
InterceptingSignal(V supplyOnDone, Consumer<V> receiveOnDone)
{
this.supplyOnDone = supplyOnDone;
this.receiveOnDone = receiveOnDone;
}
public boolean isSignalled()
{
return isSignalled;
}
public synchronized boolean isCancelled()
{
return isCancelled;
}
public synchronized boolean isSet()
{
return isCancelled | isSignalled;
}
public void signal()
{
doSignal();
}
synchronized boolean doSignal()
{
if (isSet())
return false;
isSignalled = true;
receiveOnDone.accept(supplyOnDone);
inner.signal();
if (intercepted != null && !intercepted.isTriggered())
intercepted.interceptWakeup(SIGNAL, Thread.currentThread());
return true;
}
public synchronized boolean checkAndClear()
{
if (isSet())
return isSignalled;
isCancelled = true;
receiveOnDone.accept(supplyOnDone);
inner.signal();
return false;
}
public synchronized void cancel()
{
checkAndClear();
}
Condition maybeIntercept(InterceptedWait.Kind kind, long waitNanos)
{
assert intercepted == null;
assert !inner.isSignalled();
InterceptibleThread thread = ifIntercepted();
if (thread == null)
return inner;
intercepted = new InterceptedConditionWait(kind, waitNanos, thread, captureWaitSite(thread), inner);
thread.interceptWait(intercepted);
return intercepted;
}
}
}