blob: 1f37622f3fe263da7088116cdd51b0c693c37068 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.oozie.service.CallableQueueService.CallableWrapper;
import org.apache.oozie.util.NamedThreadFactory;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
import org.eclipse.jetty.util.ConcurrentHashSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@SuppressWarnings("deprecation")
public class AsyncXCommandExecutor {
public static final int MIN_PRIORITY = 0;
public static final long ANTI_STARVATION_INTERVAL = 500;
private static XLog log = XLog.getLog(AsyncXCommandExecutor.class);
private final ThreadPoolExecutor executor;
private final ScheduledThreadPoolExecutor scheduledExecutor;
private final boolean needConcurrencyCheck;
private final CallableQueueService callableQueueService;
private final AtomicInteger activeCommands;
private final long maxActiveCommands; // equivalent of "queueSize" in CQS
private final long maxWait;
private final long maxPriority;
private final int awaitTerminationTimeoutSeconds;
private final BlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
private final BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayWorkQueue;
private final ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType;
private long lastAntiStarvationCheck = 0;
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressFBWarnings( value = "SIC_INNER_SHOULD_BE_STATIC_ANON",
justification = "Unnecessary to refactor innen classes defined here")
public AsyncXCommandExecutor(int threads,
int delayedCallableThreads,
boolean needConcurrencyCheck,
CallableQueueService callableAccess,
long maxActiveCommands,
long maxWait,
int priorities,
int awaitTerminationTimeoutSeconds) {
priorityBlockingQueue = new PriorityBlockingQueue<CallableWrapper<?>>(100, new PriorityComparator());
executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS,
(BlockingQueue) priorityBlockingQueue,
new NamedThreadFactory("CallableQueue")) {
protected void beforeExecute(Thread t, Runnable r) {
XLog.Info.get().clear();
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return (RunnableFuture<T>)callable;
}
};
this.scheduledExecutor = new ScheduledThreadPoolExecutor(delayedCallableThreads,
new NamedThreadFactory("ScheduledCallable")) {
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
AccessibleRunnableScheduledFuture<V> arsf =
new AccessibleRunnableScheduledFuture<>(task, runnable);
return arsf;
}
};
this.delayWorkQueue = (BlockingQueue) scheduledExecutor.getQueue();
this.needConcurrencyCheck = needConcurrencyCheck;
this.callableQueueService = callableAccess;
this.maxActiveCommands = maxActiveCommands;
this.maxWait = maxWait;
this.activeCommands = new AtomicInteger(0);
this.pendingCommandsPerType = new ConcurrentHashMap<>();
Preconditions.checkArgument(priorities > 0, "Number of priorities must be >0");
this.maxPriority = priorities - 1;
Preconditions.checkArgument(awaitTerminationTimeoutSeconds > 0,
String.format("Await termination timeout must be >0, is %s", awaitTerminationTimeoutSeconds));
this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds;
}
@VisibleForTesting
AsyncXCommandExecutor(boolean needConcurrencyCheck,
CallableQueueService callableAccess,
long maxActiveCommands,
ThreadPoolExecutor executor,
ScheduledThreadPoolExecutor scheduledExecutor,
PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue,
BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue,
ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType,
AtomicInteger activeCommands,
long maxWait,
long priorities,
int awaitTerminationTimeoutSeconds) {
this.priorityBlockingQueue = priorityBlockingQueue;
this.delayWorkQueue = delayQueue;
this.pendingCommandsPerType = pendingCommandsPerType;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.needConcurrencyCheck = needConcurrencyCheck;
this.callableQueueService = callableAccess;
this.maxActiveCommands = maxActiveCommands;
this.activeCommands = activeCommands;
this.maxWait = maxWait;
this.maxPriority = priorities - 1;
this.awaitTerminationTimeoutSeconds = awaitTerminationTimeoutSeconds;
}
public synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
if (!ignoreQueueSize && activeCommands.get() >= maxActiveCommands) {
log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
return false;
}
if (wrapper.filterDuplicates()) {
wrapper.addToUniqueCallables();
int priority = wrapper.getPriority();
long initialDelay = wrapper.getInitialDelay();
try {
if (priority > maxPriority || priority < MIN_PRIORITY) {
throw new IllegalArgumentException("priority out of range: " + priority);
}
if (initialDelay == 0) {
executor.execute(wrapper);
} else {
ScheduledXCallable scheduledXCallable = new ScheduledXCallable(wrapper);
long schedDelay = wrapper.getDelay(TimeUnit.MILLISECONDS);
scheduledExecutor.schedule(scheduledXCallable,
schedDelay, TimeUnit.MILLISECONDS);
}
activeCommands.incrementAndGet();
} catch (Throwable ree) {
wrapper.removeFromUniqueCallables();
throw new RuntimeException(ree);
}
}
return true;
}
public void handleConcurrencyExceeded(CallableWrapper<?> command) {
String type = command.getElement().getType();
Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type);
if (commandsForType == null) {
commandsForType = new ConcurrentHashSet<>();
Set<CallableWrapper<?>> oldCommandForType;
oldCommandForType = pendingCommandsPerType.putIfAbsent(type, commandsForType);
if (oldCommandForType != null) {
// a different thread was faster
commandsForType = oldCommandForType;
}
}
commandsForType.add(command);
}
public void checkMaxConcurrency(String type) {
Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type);
if (commandsForType != null) {
// Only a single thread should be doing stuff here! Reason: concurrent executions might
// submit an eligible XCallable multiple times, which must be avoided.
synchronized (commandsForType) {
boolean doAntiStarvation = false;
int priorityModified = 0;
long now = System.currentTimeMillis();
if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) {
doAntiStarvation = true;
}
for (Iterator<CallableWrapper<?>> itr = commandsForType.iterator(); itr.hasNext();) {
CallableWrapper<?> command = itr.next();
// Anti-starvation logic: try to promote callables that have been waiting for too long
int currentPrio = command.getPriority();
if (doAntiStarvation
&& command.getDelay(TimeUnit.MILLISECONDS) < -maxWait
&& currentPrio < maxPriority) {
command.setDelay(0, TimeUnit.MILLISECONDS);
command.setPriority(++currentPrio);
priorityModified++;
}
if (callableQueueService.canSubmitCallable(command.getElement())) {
if (activeCommands.get() >= maxActiveCommands) {
log.warn("queue full, ignoring queuing for [{0}]", command.getElement().getKey());
activeCommands.decrementAndGet();
} else {
executor.execute(command);
}
itr.remove();
}
}
if (doAntiStarvation) {
lastAntiStarvationCheck = System.currentTimeMillis();
}
if (priorityModified > 0) {
log.debug("Anti-starvation: handled [{0}] elements", priorityModified);
}
}
}
}
public void commandFinished() {
// Note: this is to track the number of elements. Otherwise we'd have to combine the size of
// two queues + a list.
activeCommands.decrementAndGet();
}
public ThreadPoolExecutor getExecutorService() {
return executor;
}
public void shutdown() {
try {
shutdownExecutor(executor, "executor");
shutdownExecutor(scheduledExecutor, "scheduled executor");
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for executor shutdown");
}
}
public boolean isShutDown() {
return executor.isShutdown() || scheduledExecutor.isShutdown();
}
public boolean isTerminated() {
return executor.isTerminated() || scheduledExecutor.isTerminated();
}
public List<String> getQueueDump() {
List<CallableWrapper<?>> copyOfPending = new ArrayList<>(100);
List<String> queueDump = new ArrayList<>(100);
// Safe to iterate
for (Map.Entry<String, Set<CallableWrapper<?>>> entry : pendingCommandsPerType.entrySet()) {
Set<CallableWrapper<?>> pendingCommandsPerType = entry.getValue();
copyOfPending.addAll(pendingCommandsPerType);
}
// Safe to iterate
for (final CallableWrapper<?> wrapper : priorityBlockingQueue) {
queueDump.add(wrapper.toString());
}
// Safe to iterate
for (final AccessibleRunnableScheduledFuture<ScheduledXCallable> future : delayWorkQueue) {
ScheduledXCallable delayedXCallable = (ScheduledXCallable) future.getTask();
queueDump.add(delayedXCallable.getCallableWrapper().toString());
}
for (final CallableWrapper<?> wrapper : copyOfPending) {
queueDump.add(wrapper.toString());
}
return queueDump;
}
public int getSize() {
return activeCommands.get();
}
public class ScheduledXCallable implements Runnable {
private CallableWrapper<?> target;
public ScheduledXCallable(CallableWrapper<?> target) {
this.target = target;
}
@Override
public void run() {
if (needConcurrencyCheck && !callableQueueService.canSubmitCallable(target.getElement())) {
XCallable<?> callable = target.getElement();
handleConcurrencyExceeded(target);
// need this to deal with a special race condition: we detect that concurrency
// exceeded, but an XCommand (or more!) with the same type just happens to finish. If that
// happens, this callable might never get scheduled again (or much later), so we have to guard
// against this condition.
checkMaxConcurrency(callable.getType());
} else {
executor.execute(target);
}
}
public CallableWrapper<?> getCallableWrapper() {
return target;
}
}
@SuppressFBWarnings(value = "SE_COMPARATOR_SHOULD_BE_SERIALIZABLE",
justification = "PriorityBlockingQueue which uses this comparator will never be serialized")
public static class PriorityComparator implements Comparator<CallableWrapper<?>> {
@Override
public int compare(CallableWrapper<?> o1, CallableWrapper<?> o2) {
return Integer.compare(o2.getPriority(), o1.getPriority());
}
}
// We have to use this so that scheduled elements in the DelayWorkQueue are accessible
@SuppressFBWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
justification = "This class has a natural ordering (expiration) which is inconsistent with equals")
public static class AccessibleRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
private final Runnable task;
private RunnableScheduledFuture<V> originalFuture;
public AccessibleRunnableScheduledFuture(RunnableScheduledFuture<V> originalFuture,
Runnable task) {
this.task = task;
this.originalFuture = originalFuture;
}
@Override
public void run() {
originalFuture.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return originalFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return originalFuture.isCancelled();
}
@Override
public boolean isDone() {
return originalFuture.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return originalFuture.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return originalFuture.get(timeout, unit);
}
@Override
public long getDelay(TimeUnit unit) {
return originalFuture.getDelay(unit);
}
@Override
public int compareTo(Delayed o) {
return originalFuture.compareTo(o);
}
@Override
public boolean isPeriodic() {
return originalFuture.isPeriodic();
}
public Runnable getTask() {
return task;
}
}
private void shutdownExecutor(ExecutorService executor, String name) throws InterruptedException {
long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.awaitTerminationTimeoutSeconds);
executor.shutdown();
while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
log.info("Waiting for [{0}] to shutdown", name);
if (System.currentTimeMillis() > limit) {
log.warn("Gave up, continuing without waiting for executor to shutdown");
break;
}
}
}
}