blob: 7406e46ed708f8f93c235d0a071b63cad5b8d84b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.job;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTED_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_QUEUE_WAIT_TIME;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
*
* Thread pool executor that executes scans in parallel
*
*
* @since 0.1
*/
@SuppressWarnings("rawtypes")
public class JobManager<T> extends AbstractRoundRobinQueue<T> {
private static final AtomicLong PHOENIX_POOL_INDEX = new AtomicLong(1);
public JobManager(int maxSize) {
super(maxSize, true); // true -> new producers move to front of queue; this reduces latency.
}
@Override
protected Object extractProducer(T o) {
if( o instanceof JobFutureTask){
return ((JobFutureTask)o).getJobId();
}
return o;
}
public static interface JobRunnable<T> extends Runnable {
public Object getJobId();
public TaskExecutionMetricsHolder getTaskExecutionMetric();
}
public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize, boolean useInstrumentedThreadPool) {
BlockingQueue<Runnable> queue;
if (queueSize == 0) {
queue = new SynchronousQueue<Runnable>(); // Specialized for 0 length.
} else {
queue = new JobManager<Runnable>(queueSize);
}
String name = "phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(name + "-thread-%s")
.setDaemon(true)
.setThreadFactory(
new ContextClassLoaderThreadFactory(JobManager.class.getClassLoader()))
.build();
ThreadPoolExecutor exec;
if (useInstrumentedThreadPool) {
// For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
exec = new InstrumentedThreadPoolExecutor(name, size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
return new InstrumentedJobFutureTask<T>(call);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new InstrumentedJobFutureTask<T>(runnable, value);
}
};
} else {
// For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
exec = new ThreadPoolExecutor(size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
// Override this so we can create a JobFutureTask so we can extract out the parentJobId (otherwise, in the default FutureTask, it is private).
return new JobFutureTask<T>(call);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new JobFutureTask<T>(runnable, value);
}
};
}
exec.allowCoreThreadTimeOut(true); // ... and allow core threads to time out. This just keeps things clean when idle, and is nice for ftests modes, etc., where we'd especially like these not to linger.
return exec;
}
/**
* Subclasses FutureTask for the sole purpose of providing {@link #getCallable()}, which is used to extract the producer in the {@link JobBasedRoundRobinQueue}
*/
static class JobFutureTask<T> extends FutureTask<T> {
private final Object jobId;
@Nullable
private final TaskExecutionMetricsHolder taskMetric;
public JobFutureTask(Runnable r, T t) {
super(r, t);
if(r instanceof JobRunnable){
this.jobId = ((JobRunnable)r).getJobId();
this.taskMetric = ((JobRunnable)r).getTaskExecutionMetric();
} else {
this.jobId = this;
this.taskMetric = null;
}
}
public JobFutureTask(Callable<T> c) {
super(c);
// FIXME: this fails when executor used by hbase
if (c instanceof JobCallable) {
this.jobId = ((JobCallable<T>) c).getJobId();
this.taskMetric = ((JobCallable<T>) c).getTaskExecutionMetric();
} else {
this.jobId = this;
this.taskMetric = null;
}
}
public Object getJobId() {
return jobId;
}
}
/**
* Instrumented version of {@link JobFutureTask} that measures time spent by a task at various stages in the queue
* and when executed.
*/
private static class InstrumentedJobFutureTask<T> extends JobFutureTask<T> {
/*
* Time at which the task was submitted to the executor.
*/
private final long taskSubmissionTime;
// Time at which the task is about to be executed.
private long taskExecutionStartTime;
public InstrumentedJobFutureTask(Runnable r, T t) {
super(r, t);
this.taskSubmissionTime = System.currentTimeMillis();
}
public InstrumentedJobFutureTask(Callable<T> c) {
super(c);
this.taskSubmissionTime = System.currentTimeMillis();
}
@Override
public void run() {
this.taskExecutionStartTime = System.currentTimeMillis();
super.run();
}
public long getTaskSubmissionTime() {
return taskSubmissionTime;
}
public long getTaskExecutionStartTime() {
return taskExecutionStartTime;
}
}
/**
* Delegating callable implementation that preserves the parentJobId and sets up thread tracker stuff before delegating to the actual command.
*/
public static interface JobCallable<T> extends Callable<T> {
public Object getJobId();
public TaskExecutionMetricsHolder getTaskExecutionMetric();
}
/**
* Extension of the default thread factory returned by {@code Executors.defaultThreadFactory}
* that sets the context classloader on newly-created threads to be a specific classloader (and
* not the context classloader of the calling thread).
* <p/>
* See {@link org.apache.phoenix.util.PhoenixContextExecutor} for the rationale on changing
* the context classloader.
*/
static class ContextClassLoaderThreadFactory implements ThreadFactory {
private final ThreadFactory baseFactory;
private final ClassLoader contextClassLoader;
public ContextClassLoaderThreadFactory(ClassLoader contextClassLoader) {
baseFactory = Executors.defaultThreadFactory();
this.contextClassLoader = contextClassLoader;
}
@Override
public Thread newThread(Runnable r) {
Thread t = baseFactory.newThread(r);
t.setContextClassLoader(contextClassLoader);
return t;
}
}
/**
* Thread pool executor that instruments the various characteristics of the backing pool of threads and queue. This
* executor assumes that all the tasks handled are of type {@link JobManager.InstrumentedJobFutureTask}
*/
private static class InstrumentedThreadPoolExecutor extends ThreadPoolExecutor {
private final RejectedExecutionHandler rejectedExecHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
TaskExecutionMetricsHolder metrics = getRequestMetric(r);
if (metrics != null) {
metrics.getNumRejectedTasks().increment();
}
GLOBAL_REJECTED_TASK_COUNTER.increment();
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
}
};
public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
setRejectedExecutionHandler(rejectedExecHandler);
}
@Override
public void execute(Runnable task) {
TaskExecutionMetricsHolder metrics = getRequestMetric(task);
if (metrics != null) {
metrics.getNumTasks().increment();
}
GLOBAL_TASK_EXECUTED_COUNTER.increment();
super.execute(task);
}
@Override
protected void beforeExecute(Thread worker, Runnable task) {
InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask)task;
long queueWaitTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
GLOBAL_TASK_QUEUE_WAIT_TIME.update(queueWaitTime);
TaskExecutionMetricsHolder metrics = getRequestMetric(task);
if (metrics != null) {
metrics.getTaskQueueWaitTime().change(queueWaitTime);
}
super.beforeExecute(worker, instrumentedTask);
}
@Override
protected void afterExecute(Runnable task, Throwable t) {
InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask)task;
try {
super.afterExecute(instrumentedTask, t);
} finally {
long taskExecutionTime = System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime();
long endToEndTaskTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
TaskExecutionMetricsHolder metrics = getRequestMetric(task);
if (metrics != null) {
metrics.getTaskExecutionTime().change(taskExecutionTime);
metrics.getTaskEndToEndTime().change(endToEndTaskTime);
}
GLOBAL_TASK_EXECUTION_TIME.update(taskExecutionTime);
GLOBAL_TASK_END_TO_END_TIME.update(endToEndTaskTime);
}
}
private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) {
return ((JobFutureTask)task).taskMetric;
}
}
}