blob: c596d2bfacb615f004fb74c2921c0056700f094f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.concurrent;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.metrics.ThreadPoolMetrics;
import org.apache.cassandra.utils.MBeanWrapper;
/**
* A {@link ThreadPoolExecutorBase} adapter to expose it via JMX.
* The executor is not itself modified to maximise code re-use.
* Only its rejected execution handler is updated, and a shutdown listener is registered.
*/
@VisibleForTesting
public class ThreadPoolExecutorJMXAdapter implements Runnable, ResizableThreadPoolMXBean
{
/**
* A builder wrapper that delegates all methods except {@link Builder#build()}
* @param <E>
*/
static class Builder<E extends ThreadPoolExecutorBase> implements ExecutorBuilder<E>
{
final ExecutorBuilder<E> wrapped;
final String jmxPath;
Builder(ExecutorBuilder<E> wrapped, String jmxPath)
{
this.wrapped = wrapped;
this.jmxPath = jmxPath;
}
@Override
public ExecutorBuilder<E> withKeepAlive(long keepAlive, TimeUnit keepAliveUnits)
{
wrapped.withKeepAlive(keepAlive, keepAliveUnits);
return this;
}
@Override
public ExecutorBuilder<E> withKeepAlive()
{
wrapped.withKeepAlive();
return this;
}
@Override
public ExecutorBuilder<E> withThreadPriority(int threadPriority)
{
wrapped.withThreadPriority(threadPriority);
return this;
}
@Override
public ExecutorBuilder<E> withQueueLimit(int queueLimit)
{
wrapped.withQueueLimit(queueLimit);
return this;
}
@Override
public ExecutorBuilder<E> withThreadGroup(ThreadGroup threadGroup)
{
wrapped.withThreadGroup(threadGroup);
return this;
}
@Override
public ExecutorBuilder<E> withDefaultThreadGroup()
{
wrapped.withDefaultThreadGroup();
return this;
}
@Override
public ExecutorBuilder<E> withRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
{
wrapped.withRejectedExecutionHandler(rejectedExecutionHandler);
return this;
}
@Override
public ExecutorBuilder<E> withUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
{
wrapped.withUncaughtExceptionHandler(uncaughtExceptionHandler);
return this;
}
/**
* Invoke {@link ExecutorBuilder#build()} on {@link #wrapped}, and register the resultant
* {@link ThreadPoolExecutorBase} with a new {@link ThreadPoolExecutorJMXAdapter}.
*
* The executor constructed by {@link #wrapped} is returned.
*/
@Override
public E build()
{
E result = wrapped.build();
register(jmxPath, result);
return result;
}
}
public static void register(String jmxPath, ThreadPoolExecutorBase executor)
{
new ThreadPoolExecutorJMXAdapter(jmxPath, executor);
}
final String mbeanName;
final ThreadPoolExecutorBase executor;
final ThreadPoolMetrics metrics;
boolean released;
private ThreadPoolExecutorJMXAdapter(String jmxPath, ThreadPoolExecutorBase executor)
{
this.executor = executor;
this.mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + executor.getThreadFactory().id;
this.metrics = new ThreadPoolMetrics(executor, jmxPath, executor.getThreadFactory().id).register();
executor.setRejectedExecutionHandler(rejectedExecutionHandler(metrics, executor.getRejectedExecutionHandler()));
MBeanWrapper.instance.registerMBean(this, mbeanName);
executor.onShutdown(this);
}
@Override
public synchronized void run()
{
if (released)
return;
MBeanWrapper.instance.unregisterMBean(mbeanName);
metrics.release();
released = true;
}
public ThreadPoolMetrics metrics()
{
return metrics;
}
@Override
public int getActiveTaskCount()
{
return executor.getActiveTaskCount();
}
@Override
public int getPendingTaskCount()
{
return executor.getPendingTaskCount();
}
@Override
public int getCoreThreads()
{
return executor.getCoreThreads();
}
@Override
public void setCoreThreads(int number)
{
executor.setCoreThreads(number);
}
@Override
public int getMaximumThreads()
{
return executor.getMaximumThreads();
}
@Override
public void setMaximumThreads(int number)
{
executor.setMaximumThreads(number);
}
@Override
public void setCorePoolSize(int corePoolSize)
{
executor.setCorePoolSize(corePoolSize);
}
@Override
public int getCorePoolSize()
{
return executor.getCorePoolSize();
}
@Override
public void setMaximumPoolSize(int maximumPoolSize)
{
executor.setMaximumPoolSize(maximumPoolSize);
}
@Override
public int getMaximumPoolSize()
{
return executor.getMaximumPoolSize();
}
@Override
public long getCompletedTaskCount()
{
return executor.getCompletedTaskCount();
}
@Override
public int getMaxTasksQueued()
{
return executor.getMaxTasksQueued();
}
static RejectedExecutionHandler rejectedExecutionHandler(ThreadPoolMetrics metrics, RejectedExecutionHandler wrap)
{
return (task, executor) ->
{
metrics.totalBlocked.inc();
metrics.currentBlocked.inc();
try
{
wrap.rejectedExecution(task, executor);
}
finally
{
metrics.currentBlocked.dec();
}
};
}
}