blob: 5efaf16cf697849240cec141b573497b29f1c74a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.concurrent;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import static java.util.stream.Collectors.toMap;
public enum Stage
{
READ (false, "ReadStage", "request", DatabaseDescriptor::getConcurrentReaders, DatabaseDescriptor::setConcurrentReaders, Stage::multiThreadedLowSignalStage),
MUTATION (true, "MutationStage", "request", DatabaseDescriptor::getConcurrentWriters, DatabaseDescriptor::setConcurrentWriters, Stage::multiThreadedLowSignalStage),
COUNTER_MUTATION (true, "CounterMutationStage", "request", DatabaseDescriptor::getConcurrentCounterWriters, DatabaseDescriptor::setConcurrentCounterWriters, Stage::multiThreadedLowSignalStage),
VIEW_MUTATION (true, "ViewMutationStage", "request", DatabaseDescriptor::getConcurrentViewWriters, DatabaseDescriptor::setConcurrentViewWriters, Stage::multiThreadedLowSignalStage),
GOSSIP (true, "GossipStage", "internal", () -> 1, null, Stage::singleThreadedStage),
REQUEST_RESPONSE (false, "RequestResponseStage", "request", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedLowSignalStage),
ANTI_ENTROPY (false, "AntiEntropyStage", "internal", () -> 1, null, Stage::singleThreadedStage),
MIGRATION (false, "MigrationStage", "internal", () -> 1, null, Stage::singleThreadedStage),
MISC (false, "MiscStage", "internal", () -> 1, null, Stage::singleThreadedStage),
TRACING (false, "TracingStage", "internal", () -> 1, null, Stage::tracingExecutor),
INTERNAL_RESPONSE (false, "InternalResponseStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage),
IMMEDIATE (false, "ImmediateStage", "internal", () -> 0, null, Stage::immediateExecutor);
public static final long KEEP_ALIVE_SECONDS = 60; // seconds to keep "extra" threads alive for when idle
public final String jmxName;
/** Set true if this executor should be gracefully shutdown before stopping
* the commitlog allocator. Tasks on executors that issue mutations may
* block indefinitely waiting for a new commitlog segment, preventing a
* clean drain/shutdown.
*/
public final boolean shutdownBeforeCommitlog;
private final Supplier<LocalAwareExecutorService> initialiser;
private volatile LocalAwareExecutorService executor = null;
Stage(Boolean shutdownBeforeCommitlog, String jmxName, String jmxType, IntSupplier numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize, ExecutorServiceInitialiser initialiser)
{
this.shutdownBeforeCommitlog = shutdownBeforeCommitlog;
this.jmxName = jmxName;
this.initialiser = () -> initialiser.init(jmxName,jmxType, numThreads.getAsInt(), onSetMaximumPoolSize);
}
private static String normalizeName(String stageName)
{
// Handle discrepancy between JMX names and actual pool names
String upperStageName = stageName.toUpperCase();
if (upperStageName.endsWith("STAGE"))
{
upperStageName = upperStageName.substring(0, stageName.length() - 5);
}
return upperStageName;
}
private static final Map<String,Stage> nameMap = Arrays.stream(values())
.collect(toMap(s -> Stage.normalizeName(s.jmxName),
s -> s));
public static Stage fromPoolName(String stageName)
{
String upperStageName = normalizeName(stageName);
Stage result = nameMap.get(upperStageName);
if (result != null)
return result;
try
{
return valueOf(upperStageName);
}
catch (IllegalArgumentException e)
{
switch(upperStageName) // Handle discrepancy between configuration file and stage names
{
case "CONCURRENT_READS":
return READ;
case "CONCURRENT_WRITERS":
return MUTATION;
case "CONCURRENT_COUNTER_WRITES":
return COUNTER_MUTATION;
case "CONCURRENT_MATERIALIZED_VIEW_WRITES":
return VIEW_MUTATION;
default:
throw new IllegalStateException("Must be one of " + Arrays.stream(values())
.map(Enum::toString)
.collect(Collectors.joining(",")));
}
}
}
// Convenience functions to execute on this stage
public void execute(Runnable command) { executor().execute(command); }
public void execute(Runnable command, ExecutorLocals locals) { executor().execute(command, locals); }
public void maybeExecuteImmediately(Runnable command) { executor().maybeExecuteImmediately(command); }
public <T> Future<T> submit(Callable<T> task) { return executor().submit(task); }
public Future<?> submit(Runnable task) { return executor().submit(task); }
public <T> Future<T> submit(Runnable task, T result) { return executor().submit(task, result); }
public LocalAwareExecutorService executor()
{
if (executor == null)
{
synchronized (this)
{
if (executor == null)
{
executor = initialiser.get();
}
}
}
return executor;
}
private static List<ExecutorService> executors()
{
return Stream.of(Stage.values())
.map(Stage::executor)
.collect(Collectors.toList());
}
private static List<ExecutorService> mutatingExecutors()
{
return Stream.of(Stage.values())
.filter(stage -> stage.shutdownBeforeCommitlog)
.map(Stage::executor)
.collect(Collectors.toList());
}
/**
* This method shuts down all registered stages.
*/
public static void shutdownNow()
{
ExecutorUtils.shutdownNow(executors());
}
public static void shutdownAndAwaitMutatingExecutors(boolean interrupt, long timeout, TimeUnit units) throws InterruptedException, TimeoutException
{
List<ExecutorService> executors = mutatingExecutors();
ExecutorUtils.shutdown(interrupt, executors);
ExecutorUtils.awaitTermination(timeout, units, executors);
}
public static boolean areMutationExecutorsTerminated()
{
return mutatingExecutors().stream().allMatch(ExecutorService::isTerminated);
}
@VisibleForTesting
public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException
{
List<ExecutorService> executors = executors();
ExecutorUtils.shutdownNow(executors);
ExecutorUtils.awaitTermination(timeout, units, executors);
}
static LocalAwareExecutorService tracingExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
{
RejectedExecutionHandler reh = (r, executor) -> MessagingService.instance().metrics.recordSelfDroppedMessage(Verb._TRACE);
return new TracingExecutor(1,
1,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory(jmxName),
reh);
}
static LocalAwareExecutorService multiThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
{
return new JMXEnabledThreadPoolExecutor(numThreads,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(jmxName),
jmxType);
}
static LocalAwareExecutorService multiThreadedLowSignalStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
{
return SharedExecutorPool.SHARED.newExecutor(numThreads, onSetMaximumPoolSize, jmxType, jmxName);
}
static LocalAwareExecutorService singleThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
{
return new JMXEnabledSingleThreadExecutor(jmxName, jmxType);
}
static LocalAwareExecutorService immediateExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
{
return ImmediateExecutor.INSTANCE;
}
@FunctionalInterface
public interface ExecutorServiceInitialiser
{
public LocalAwareExecutorService init(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize);
}
/**
* Returns core thread pool size
*/
public int getCorePoolSize()
{
return executor().getCorePoolSize();
}
/**
* Allows user to resize core thread pool size
*/
public void setCorePoolSize(int newCorePoolSize)
{
executor().setCorePoolSize(newCorePoolSize);
}
/**
* Returns maximum pool size of thread pool.
*/
public int getMaximumPoolSize()
{
return executor().getMaximumPoolSize();
}
/**
* Allows user to resize maximum size of the thread pool.
*/
public void setMaximumPoolSize(int newMaximumPoolSize)
{
executor().setMaximumPoolSize(newMaximumPoolSize);
}
/**
* The executor used for tracing.
*/
private static class TracingExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
{
TracingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public void execute(Runnable command, ExecutorLocals locals)
{
assert locals == null;
super.execute(command);
}
public void maybeExecuteImmediately(Runnable command)
{
execute(command);
}
@Override
public int getActiveTaskCount()
{
return getActiveCount();
}
@Override
public int getPendingTaskCount()
{
return getQueue().size();
}
}
}