blob: 3121a19f377ffe7a0c58fdf3a375ceb55155a01e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.util;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* {@link ScheduledThreadPoolExecutor} with exposed stats. This class is exposing following stats for
* helping understanding the healthy of this thread pool executor.
* <h3>Metrics</h3>
* <ul>
* <li>pending_tasks: gauge. how many tasks are pending in this executor.
* <li>completed_tasks: gauge. how many tasks are completed in this executor.
* <li>total_tasks: gauge. how many tasks are submitted to this executor.
* <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
* waiting being executed.
* <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
* executing.
* </ul>
*/
public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
static final Logger LOG = LoggerFactory.getLogger(MonitoredScheduledThreadPoolExecutor.class);
private class TimedRunnable implements Runnable {
final Runnable runnable;
final long enqueueNanos;
TimedRunnable(Runnable runnable) {
this.runnable = runnable;
this.enqueueNanos = MathUtils.nowInNano();
}
@Override
public void run() {
long startNanos = MathUtils.nowInNano();
long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
taskPendingStats.registerSuccessfulEvent(pendingMicros);
try {
runnable.run();
} finally {
long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
taskExecutionStats.registerSuccessfulEvent(executionMicros);
}
}
@Override
public String toString() {
return runnable.toString();
}
@Override
public int hashCode() {
return runnable.hashCode();
}
}
private class TimedCallable<T> implements Callable<T> {
final Callable<T> task;
final long enqueueNanos;
TimedCallable(Callable<T> task) {
this.task = task;
this.enqueueNanos = MathUtils.nowInNano();
}
@Override
public T call() throws Exception {
long startNanos = MathUtils.nowInNano();
long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
taskPendingStats.registerSuccessfulEvent(pendingMicros);
try {
return task.call();
} finally {
long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
taskExecutionStats.registerSuccessfulEvent(executionMicros);
}
}
}
protected final boolean traceTaskExecution;
protected final OpStatsLogger taskExecutionStats;
protected final OpStatsLogger taskPendingStats;
protected final StatsLogger statsLogger;
// Gauges and their labels
private static final String pendingTasksGaugeLabel = "pending_tasks";
private final Gauge<Number> pendingTasksGauge;
private static final String completedTasksGaugeLabel = "completed_tasks";
protected final Gauge<Number> completedTasksGauge;
private static final String totalTasksGaugeLabel = "total_tasks";
protected final Gauge<Number> totalTasksGauge;
public MonitoredScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
StatsLogger statsLogger,
boolean traceTaskExecution) {
super(corePoolSize, threadFactory);
this.traceTaskExecution = traceTaskExecution;
this.statsLogger = statsLogger;
this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time");
this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time");
this.pendingTasksGauge = new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
return getQueue().size();
}
};
this.completedTasksGauge = new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
return getCompletedTaskCount();
}
};
this.totalTasksGauge = new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
return getTaskCount();
}
};
// outstanding tasks
this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge);
// completed tasks
this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge);
// total tasks
this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge);
}
private Runnable timedRunnable(Runnable r) {
return traceTaskExecution ? new TimedRunnable(r) : r;
}
private <T> Callable<T> timedCallable(Callable<T> task) {
return traceTaskExecution ? new TimedCallable<T>(task) : task;
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(timedRunnable(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(timedRunnable(task), result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(timedCallable(task));
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Throwable hiddenThrowable = extractThrowable(r);
if (hiddenThrowable != null)
logAndHandle(hiddenThrowable, true);
// The executor re-throws exceptions thrown by the task to the uncaught exception handler
// so we don't need to pass the exception to the handler explicitly
if (null != t) {
logAndHandle(t, false);
}
}
/**
* The executor re-throws exceptions thrown by the task to the uncaught exception handler
* so we only need to do anything if uncaught exception handler has not been se
*/
private void logAndHandle(Throwable t, boolean passToHandler) {
if (Thread.getDefaultUncaughtExceptionHandler() == null) {
LOG.error("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
}
else {
LOG.info("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
if (passToHandler) {
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
}
}
}
/**
* Extract the exception (throwable) inside the ScheduledFutureTask
* @param runnable - The runable that was executed
* @return exception enclosed in the Runnable if any; null otherwise
*/
private Throwable extractThrowable(Runnable runnable) {
// Check for exceptions wrapped by FutureTask.
// We do this by calling get(), which will cause it to throw any saved exception.
// Check for isDone to prevent blocking
if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) {
try {
((Future<?>) runnable).get();
} catch (CancellationException e) {
LOG.debug("Task {} cancelled", runnable, e.getCause());
} catch (InterruptedException e) {
LOG.debug("Task {} was interrupted", runnable, e);
} catch (ExecutionException e) {
return e.getCause();
}
}
return null;
}
void unregisterGauges() {
this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge);
this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge);
this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge);
}
}