blob: c7f4dce8b85ac6517100e1c261a4ee5a911cd9a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.systems;
import java.io.Serializable;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.concurrent.ExecutorBuilder;
import org.apache.cassandra.concurrent.ExecutorBuilderFactory;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon;
import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts;
import org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe;
import org.apache.cassandra.concurrent.Interruptible.Task;
import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
import org.apache.cassandra.concurrent.LocalAwareSequentialExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.concurrent.SequentialExecutorPlus;
import org.apache.cassandra.concurrent.Interruptible;
import org.apache.cassandra.concurrent.SyncFutureTask;
import org.apache.cassandra.concurrent.TaskFactory;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableBiFunction;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableCallable;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableQuadFunction;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableSupplier;
import org.apache.cassandra.distributed.impl.IsolatedExecutor;
import org.apache.cassandra.simulator.systems.InterceptibleThreadFactory.ConcreteInterceptibleThreadFactory;
import org.apache.cassandra.simulator.systems.InterceptibleThreadFactory.PlainThreadFactory;
import org.apache.cassandra.simulator.systems.InterceptingExecutor.DiscardingSequentialExecutor;
import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptingTaskFactory;
import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptingLocalAwareSequentialExecutor;
import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptingPooledExecutor;
import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptingPooledLocalAwareExecutor;
import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptingSequentialExecutor;
import org.apache.cassandra.simulator.systems.InterceptorOfExecution.InterceptExecution;
import org.apache.cassandra.simulator.systems.SimulatedTime.LocalTime;
import org.apache.cassandra.utils.Closeable;
import org.apache.cassandra.utils.WithResources;
import org.apache.cassandra.utils.concurrent.RunnableFuture;
import static org.apache.cassandra.simulator.systems.SimulatedAction.Kind.INFINITE_LOOP;
public class InterceptingExecutorFactory implements ExecutorFactory, Closeable
{
static class StandardSyncTaskFactory extends TaskFactory.Standard implements InterceptingTaskFactory, Serializable
{
@Override
public <T> RunnableFuture<T> newTask(Callable<T> call)
{
return new SyncFutureTask<>(call);
}
@Override
protected <T> RunnableFuture<T> newTask(WithResources withResources, Callable<T> call)
{
return new SyncFutureTask<>(withResources, call);
}
}
static class LocalAwareSyncTaskFactory extends TaskFactory.LocalAware implements InterceptingTaskFactory, Serializable
{
@Override
public <T> RunnableFuture<T> newTask(Callable<T> call)
{
return new SyncFutureTask<>(call);
}
@Override
protected <T> RunnableFuture<T> newTask(WithResources withResources, Callable<T> call)
{
return new SyncFutureTask<>(withResources, call);
}
}
abstract static class AbstractExecutorBuilder<E extends ExecutorService> implements ExecutorBuilder<E>
{
ThreadGroup threadGroup;
UncaughtExceptionHandler uncaughtExceptionHandler;
@Override
public ExecutorBuilder<E> withKeepAlive(long keepAlive, TimeUnit keepAliveUnits)
{
return this;
}
@Override
public ExecutorBuilder<E> withKeepAlive()
{
return this;
}
@Override
public ExecutorBuilder<E> withThreadPriority(int threadPriority)
{
return this;
}
@Override
public ExecutorBuilder<E> withThreadGroup(ThreadGroup threadGroup)
{
this.threadGroup = threadGroup;
return this;
}
@Override
public ExecutorBuilder<E> withDefaultThreadGroup()
{
throw new UnsupportedOperationException();
}
@Override
public ExecutorBuilder<E> withQueueLimit(int queueLimit)
{
// should implement (not pressing)
return this;
}
@Override
public ExecutorBuilder<E> withRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
{
// we don't currently ever reject execution, but we should perhaps consider implementing it
return this;
}
@Override
public ExecutorBuilder<E> withUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler)
{
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
return this;
}
}
class SimpleExecutorBuilder<E extends ExecutorService> extends AbstractExecutorBuilder<E>
{
ThreadGroup threadGroup;
final String name;
final SerializableBiFunction<InterceptorOfExecution, ThreadFactory, E> factory;
SimpleExecutorBuilder(String name, SerializableBiFunction<InterceptorOfExecution, ThreadFactory, E> factory)
{
this.factory = factory;
this.name = name;
}
@Override
public E build()
{
return transferToInstance.apply(factory).apply(simulatedExecution, factory(name, null, threadGroup, uncaughtExceptionHandler));
}
}
final SimulatedExecution simulatedExecution;
final InterceptorOfGlobalMethods interceptorOfGlobalMethods;
final ClassLoader classLoader;
final ThreadGroup threadGroup;
final IIsolatedExecutor.DynamicFunction<Serializable> transferToInstance;
volatile boolean isClosed;
InterceptingExecutorFactory(SimulatedExecution simulatedExecution, InterceptorOfGlobalMethods interceptorOfGlobalMethods, ClassLoader classLoader, ThreadGroup threadGroup)
{
this.simulatedExecution = simulatedExecution;
this.interceptorOfGlobalMethods = interceptorOfGlobalMethods;
this.classLoader = classLoader;
this.threadGroup = threadGroup;
this.transferToInstance = IsolatedExecutor.transferTo(classLoader);
}
public InterceptibleThreadFactory factory(String name)
{
return factory(name, null);
}
InterceptibleThreadFactory factory(String name, Object extraInfo)
{
return factory(name, extraInfo, threadGroup);
}
InterceptibleThreadFactory factory(String name, Object extraInfo, ThreadGroup threadGroup)
{
return factory(name, extraInfo, threadGroup, null);
}
InterceptibleThreadFactory factory(String name, Object extraInfo, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler)
{
return factory(name, extraInfo, threadGroup, uncaughtExceptionHandler, ConcreteInterceptibleThreadFactory::new);
}
ThreadFactory plainFactory(String name, Object extraInfo, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler)
{
return factory(name, extraInfo, threadGroup, uncaughtExceptionHandler, PlainThreadFactory::new);
}
<F extends ThreadFactory> F factory(String name, Object extraInfo, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, InterceptibleThreadFactory.MetaFactory<F> factory)
{
if (uncaughtExceptionHandler == null)
uncaughtExceptionHandler = transferToInstance.apply((SerializableSupplier<UncaughtExceptionHandler>)() -> InterceptorOfGlobalMethods.Global::uncaughtException).get();
if (threadGroup == null) threadGroup = this.threadGroup;
else if (!this.threadGroup.parentOf(threadGroup)) throw new IllegalArgumentException();
Runnable onTermination = transferToInstance.apply((SerializableRunnable)FastThreadLocal::removeAll);
LocalTime time = transferToInstance.apply((SerializableCallable<LocalTime>) SimulatedTime.Global::current).call();
return factory.create(name, Thread.NORM_PRIORITY, classLoader, uncaughtExceptionHandler, threadGroup, onTermination, time, this, extraInfo);
}
@Override
public ExecutorBuilderFactory<ExecutorPlus, SequentialExecutorPlus> withJmx(String jmxPath)
{
return this;
}
@Override
public ExecutorBuilderFactory<ExecutorPlus, SequentialExecutorPlus> withJmxInternal()
{
return this;
}
@Override
public ExecutorBuilder<? extends SequentialExecutorPlus> configureSequential(String name)
{
return new SimpleExecutorBuilder<>(name, (interceptSupplier, threadFactory) -> new InterceptingSequentialExecutor(interceptSupplier, threadFactory, new StandardSyncTaskFactory()));
}
@Override
public ExecutorBuilder<? extends ExecutorPlus> configurePooled(String name, int threads)
{
return new SimpleExecutorBuilder<>(name, (interceptSupplier, threadFactory) -> new InterceptingPooledExecutor(interceptSupplier, threads, threadFactory, new StandardSyncTaskFactory()));
}
public SequentialExecutorPlus sequential(String name)
{
return configureSequential(name).build();
}
@Override
public LocalAwareSubFactory localAware()
{
return new LocalAwareSubFactory()
{
@Override
public LocalAwareSubFactoryWithJMX withJmx(String jmxPath)
{
return new LocalAwareSubFactoryWithJMX()
{
@Override
public LocalAwareExecutorPlus shared(String name, int threads, ExecutorPlus.MaximumPoolSizeListener onSetMaxSize)
{
return pooled(name, threads);
}
@Override
public ExecutorBuilder<? extends LocalAwareSequentialExecutorPlus> configureSequential(String name)
{
return new SimpleExecutorBuilder<>(name, (interceptSupplier, threadFactory) -> new InterceptingLocalAwareSequentialExecutor(interceptSupplier, threadFactory, new LocalAwareSyncTaskFactory()));
}
@Override
public ExecutorBuilder<? extends LocalAwareExecutorPlus> configurePooled(String name, int threads)
{
return new SimpleExecutorBuilder<>(name, (interceptSupplier, threadFactory) -> new InterceptingPooledLocalAwareExecutor(interceptSupplier, threads, threadFactory, new LocalAwareSyncTaskFactory()));
}
};
}
@Override
public ExecutorBuilder<? extends LocalAwareSequentialExecutorPlus> configureSequential(String name)
{
return new SimpleExecutorBuilder<>(name, (interceptSupplier, threadFactory) -> new InterceptingLocalAwareSequentialExecutor(interceptSupplier, threadFactory, new LocalAwareSyncTaskFactory()));
}
@Override
public ExecutorBuilder<? extends LocalAwareExecutorPlus> configurePooled(String name, int threads)
{
return new SimpleExecutorBuilder<>(name, (interceptSupplier, threadFactory) -> new InterceptingPooledLocalAwareExecutor(interceptSupplier, threads, threadFactory, new LocalAwareSyncTaskFactory()));
}
};
}
@Override
public ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name, int priority, SimulatorSemantics simulatorSemantics)
{
switch (simulatorSemantics)
{
default: throw new AssertionError();
case NORMAL:
return transferToInstance.apply((SerializableBiFunction<InterceptorOfExecution, ThreadFactory, ScheduledExecutorPlus>) (interceptSupplier, threadFactory) -> new InterceptingSequentialExecutor(interceptSupplier, threadFactory, new StandardSyncTaskFactory())).apply(simulatedExecution, factory(name));
case DISCARD:
return transferToInstance.apply((SerializableSupplier<ScheduledExecutorPlus>) DiscardingSequentialExecutor::new).get();
}
}
@Override
public ExecutorPlus pooled(String name, int threads)
{
if (threads == 1)
return configureSequential(name).build();
return configurePooled(name, threads).build();
}
public Thread startThread(String name, Runnable runnable, Daemon daemon)
{
return simulatedExecution.intercept().start(SimulatedAction.Kind.THREAD, factory(name)::newThread, runnable);
}
@VisibleForTesting
public InterceptedExecution.InterceptedThreadStart startParked(String name, Runnable run)
{
return new InterceptedExecution.InterceptedThreadStart(factory(name)::newThread,
run,
SimulatedAction.Kind.THREAD);
}
@Override
public Interruptible infiniteLoop(String name, Task task, SimulatorSafe simulatorSafe, Daemon daemon, Interrupts interrupts)
{
if (simulatorSafe != SimulatorSafe.SAFE)
{
// avoid use rewritten classes here (so use system class loader's ILE), as we cannot fully control the thread's execution
return new InfiniteLoopExecutor((n, t) -> {
Thread thread = plainFactory(n, t, threadGroup, null).newThread(t);
thread.start();
return thread;
}, name, task, interrupts);
}
InterceptExecution interceptor = simulatedExecution.intercept();
return transferToInstance.apply((SerializableQuadFunction<BiFunction<String, Runnable, Thread>, String, Task, Interrupts, Interruptible>)InfiniteLoopExecutor::new)
.apply((n, r) -> interceptor.start(INFINITE_LOOP, factory(n, task)::newThread, r), name, task, interrupts);
}
@Override
public ThreadGroup newThreadGroup(String name)
{
return new ThreadGroup(threadGroup, name);
}
public void close()
{
isClosed = true;
forEach(threadGroup, thread -> {
thread.trapInterrupts(false);
thread.interrupt();
});
threadGroup.interrupt();
}
public void interrupt()
{
threadGroup.interrupt();
}
private static void forEach(ThreadGroup threadGroup, Consumer<InterceptibleThread> consumer)
{
Thread[] threads;
ThreadGroup[] groups;
synchronized (threadGroup)
{
threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads, false);
groups = new ThreadGroup[threadGroup.activeGroupCount()];
threadGroup.enumerate(groups, false);
}
for (Thread thread : threads)
{
if (thread instanceof InterceptibleThread)
consumer.accept((InterceptibleThread) thread);
}
for (ThreadGroup group : groups) forEach(group, consumer);
}
}