blob: bf8eb4f8615de9aaafe94117ccb46b18545e65bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.common;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* An ExecutorService factory which shares threads between executors created using this service.
*/
@Private
@Unstable
public class TezSharedExecutor implements TezExecutors {
// The shared executor service which will be used to execute all the tasks.
private final ThreadPoolExecutor service;
private final DelayedExecutionPoller poller;
public TezSharedExecutor(Configuration conf) {
// The default value is 0. We could start with a few threads so that thread pool is never empty.
int minThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS,
TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS_DEFAULT);
// The default value is Integer.MAX_VALUE, but ExecutorServiceInternal will do the rate limiting
// of total numbers of tasks and hence the num threads will be bounded.
int maxThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS,
TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS_DEFAULT);
if (maxThreads < 0) {
maxThreads = Integer.MAX_VALUE;
}
this.service = new ThreadPoolExecutor(
minThreads, maxThreads,
// The timeout is to give thread a chance to be re-used instead of being cleaned up.
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TezSharedExecutor: %d").build());
// Setup polling thread to pick new tasks from the underlying executors.
poller = new DelayedExecutionPoller(service);
poller.start();
}
public ExecutorService createExecutorService(int poolSize, String threadName) {
return new ExecutorServiceInternal(poolSize, threadName);
}
// Should we allow a shared service shutdown, once this shutdown is complete, all the executors
// are in shutdown mode and will throw exception if we try to submit new tasks. And already
// submitted tasks in the ExecutorServiceInternal which are not yet submitted to the shared
// service will not be executed. That break contracts, we can fix this by tracking that the
// service is shutdown and wait until all the dependent.
public void shutdown() {
service.shutdown();
poller.interrupt();
}
public void shutdownNow() {
service.shutdownNow();
poller.interrupt();
}
@Override
protected void finalize() {
this.shutdown();
}
private static class DelayedExecutionPoller extends Thread {
// Store service reference in this static class to prevent a reference of TezSharedExecutor from
// being held inside a non static class which prevents cleanup via GC.
private final ThreadPoolExecutor service;
// A queue which contains instances which have tasks to be executed.
private final LinkedBlockingQueue<ExecutorServiceInternal> executeQueue =
new LinkedBlockingQueue<>();
DelayedExecutionPoller(ThreadPoolExecutor service) {
super("DelayedExecutionPoller");
this.setDaemon(true);
this.service = service;
}
void add(ExecutorServiceInternal es) {
executeQueue.add(es);
}
@Override
public void run() {
while (!service.isShutdown()) {
try {
executeQueue.take().tryExecute();
} catch (InterruptedException e) {
}
}
}
}
/*
* The internal shared executor service which delegates all the execution to the shared service.
* It allows managing a given instance of ExecutorService independently of other instances created
* in the same service.
*
* - It stores a queue of submitted tasks and submits only the configured poolSize number of tasks
* into the shared executor service.
* - Stores a list of futures used implement shutdownNow and awaitTermination.
*/
private class ExecutorServiceInternal extends AbstractExecutorService {
// This contains all the tasks which are submitted through this ExecutorService and has not
// finished, we use this to implement shutdownNow and awaitForTermination.
// Note: This should have been an Set, but we do not have a concurrent set.
private final ConcurrentHashMap<ManagedFutureTask<?>, Boolean> futures =
new ConcurrentHashMap<>();
// Number of tasks currently submitted by this executor to the common executor service.
private final AtomicInteger numTasksSubmitted = new AtomicInteger();
// The list of pending tasks to be submitted on behalf of this service.
private final LinkedBlockingQueue<ManagedFutureTask<?>> pendingTasks =
new LinkedBlockingQueue<>();
// Set to 0 when shutdown is complete, a CountDownLatch is used to enable wait for shutdown in
// awaitTermination.
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
// The thread name to be used for threads executing tasks of this executor.
private final String threadName;
// Total number of threads to be used.
private final int poolSize;
ExecutorServiceInternal(int poolSize, String threadName) {
Preconditions.checkArgument(poolSize > 0, "Expected poolSize > 0");
this.threadName = threadName;
this.poolSize = poolSize;
}
// A FutureTask which we will use to wrap all the runnable and callable. It adds and removes
// from the futures set above. And also notifies TezSharedExecutor to pick new tasks from the
// current ExecutorServiceInternal instance.
private class ManagedFutureTask<V> extends FutureTask<V> {
// Set to true if this task was submitted to the shared ExecutorService.
private boolean submitted = false;
ManagedFutureTask(Runnable runnable, V value) {
super(runnable, value);
addFuture(this);
}
ManagedFutureTask(Callable<V> callable) {
super(callable);
addFuture(this);
}
@Override
public void run() {
Thread thisThread = Thread.currentThread();
String savedThreadName = null;
if (threadName != null) {
savedThreadName = thisThread.getName();
thisThread.setName(String.format(threadName, thisThread.getId()));
}
try {
super.run();
} finally {
if (threadName != null) {
thisThread.setName(savedThreadName);
}
}
}
// There is a race b/w cancel and submit hence the synchronization.
synchronized void submit() {
submitted = true;
service.execute(this);
}
@Override
public void done() {
removeFuture(this);
synchronized (this) {
if (submitted) { // Decrement only if this task was submitted.
numTasksSubmitted.decrementAndGet();
}
}
// Add internal executor service to poller to schedule another task if available.
// We do this instead of invoking tryExecute here, to give a chance for this thread to be
// reused. But its still possible that a new thread is created.
poller.add(ExecutorServiceInternal.this);
}
}
private void addFuture(ManagedFutureTask<?> future) {
futures.put(future, Boolean.TRUE);
// If already shutdown, reject this task.
if (isShutdown()) {
service.getRejectedExecutionHandler().rejectedExecution(future, service);
}
}
private void removeFuture(ManagedFutureTask<?> future) {
futures.remove(future);
}
// Return our internal future task so that all the tasks submitted are tracked and cleaned up.
@SuppressWarnings("unchecked")
@Override
protected <T> ManagedFutureTask<T> newTaskFor(Runnable runnable, T value) {
if (runnable instanceof ManagedFutureTask) {
return (ManagedFutureTask<T>)runnable;
}
return new ManagedFutureTask<T>(runnable, value);
}
@Override
protected <T> ManagedFutureTask<T> newTaskFor(Callable<T> callable) {
return new ManagedFutureTask<T>(callable);
}
@Override
public void shutdown() {
shutdownLatch.countDown();
}
@Override
public List<Runnable> shutdownNow() {
shutdownLatch.countDown();
List<Runnable> pending = new ArrayList<>(pendingTasks.size());
pendingTasks.drainTo(pending);
// cancel all futures, interrupt if its running.
for (ManagedFutureTask<?> future : futures.keySet()) {
future.cancel(true);
}
return pending;
}
@Override
public boolean isShutdown() {
return shutdownLatch.getCount() == 0 || service.isShutdown();
}
@Override
public boolean isTerminated() {
if (!isShutdown()) {
return false;
}
// futures should be empty ideally, but there is a corner case where all the futures are done
// but not yet removed from futures map, for that case we check if the future is done.
for (ManagedFutureTask<?> future : futures.keySet()) {
if (!future.isDone()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
// Wait for shutdown to be invoked.
if (!shutdownLatch.await(timeout, unit)) {
return false;
}
// Wait for the remaining futures to finish.
for (ManagedFutureTask<?> future : futures.keySet()) {
long nanosLeft = deadline - System.nanoTime();
if (nanosLeft <= 0) {
return false;
}
try {
future.get(nanosLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException | CancellationException ignore) {
} catch (TimeoutException e) {
return false;
}
}
return true;
}
// Submit a task if task is available and poolSize has not been reached.
private void tryExecute() {
while (!pendingTasks.isEmpty()) {
int numTasks = numTasksSubmitted.get();
if (numTasks >= poolSize) {
return;
}
if (numTasksSubmitted.compareAndSet(numTasks, numTasks + 1)) {
ManagedFutureTask<?> task = pendingTasks.poll();
// This breaks a contract unfortunately. If a task is submitted and it ends up in a
// queue and then the shared service is shutdown then this job cannot be executed, which
// is not the contract, ideally it should execute the task.
if (task == null || task.isCancelled() || service.isShutdown()) {
numTasksSubmitted.decrementAndGet();
} else {
task.submit();
}
}
}
}
@Override
public void execute(Runnable command) {
this.pendingTasks.add(newTaskFor(command, null));
this.tryExecute();
}
@Override
protected void finalize() {
this.shutdown();
}
}
}