blob: d1222bc77fd8f0402aba5a995c4e150a6b862ef4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.NamedThreadFactory;
import org.apache.oozie.util.PollablePriorityDelayQueue;
import org.apache.oozie.util.PriorityDelayQueue;
import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
import org.apache.oozie.util.StringUtils;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
import com.google.common.collect.ImmutableSet;
/**
* The callable queue service queues {@link XCallable}s for asynchronous execution.
* <p>
* Callables can be queued for immediate execution or for delayed execution (some time in the future).
* <p>
* Callables are consumed from the queue for execution based on their priority.
* <p>
* When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
* queuing callables.
* <p>
* A thread-pool is used to execute the callables asynchronously.
* <p>
* The following configuration parameters control the callable queue service:
* <p>
* {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
* <p>
* {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
* of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
* execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
* sometime in the future.
*/
@SuppressWarnings("deprecation")
public class CallableQueueService implements Service, Instrumentable {
private static final String INSTRUMENTATION_GROUP = "callablequeue";
private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
private static final String INSTR_EXECUTED_COUNTER = "executed";
private static final String INSTR_FAILED_COUNTER = "failed";
private static final String INSTR_QUEUED_COUNTER = "queued";
private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
public static final String CONF_THREADS = CONF_PREFIX + "threads";
public static final String CONF_NEWIMPL = CONF_PREFIX + "queue.newImpl";
public static final String CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS =
CONF_PREFIX + "queue.awaitTermination.timeout.seconds";
public static final String CONF_DELAYED_CALLABLE_THREADS = CONF_PREFIX + "delayedcallable.threads";
public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
public static final int CONCURRENCY_DELAY = 500;
public static final int SAFE_MODE_DELAY = 60000;
public static final int MAX_CALLABLE_WAITTIME_MS = 30_000;
public static final int PRIORITIES = 3;
private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
private Set<String> interruptTypes;
private int interruptMapMaxSize;
private int maxCallableConcurrency;
private int queueAwaitTerminationTimeoutSeconds;
private boolean callableBegin(XCallable<?> callable) {
synchronized (activeCallables) {
AtomicInteger counter = activeCallables.get(callable.getType());
if (counter == null) {
counter = new AtomicInteger(1);
activeCallables.put(callable.getType(), counter);
return true;
}
else {
int i = counter.incrementAndGet();
return i <= maxCallableConcurrency;
}
}
}
private void callableEnd(XCallable<?> callable) {
synchronized (activeCallables) {
AtomicInteger counter = activeCallables.get(callable.getType());
if (counter == null) {
throw new IllegalStateException("Counter value should not be null");
}
else {
counter.decrementAndGet();
}
}
if (newImpl) {
asyncXCommandExecutor.commandFinished();
asyncXCommandExecutor.checkMaxConcurrency(callable.getType());
}
}
private boolean callableReachMaxConcurrency(XCallable<?> callable) {
synchronized (activeCallables) {
AtomicInteger counter = activeCallables.get(callable.getType());
if (counter == null) {
return true;
}
else {
int i = counter.get();
return i < maxCallableConcurrency;
}
}
}
public boolean canSubmitCallable(XCallable<?> callable) {
synchronized (activeCallables) {
AtomicInteger counter = activeCallables.get(callable.getType());
if (counter == null) {
counter = new AtomicInteger(1);
activeCallables.put(callable.getType(), counter);
return true;
}
else {
int i = counter.get();
return i <= maxCallableConcurrency;
}
}
}
// Callables are wrapped with the this wrapper for execution, for logging
// and instrumentation.
// The wrapper implements Runnable and Comparable to be able to work with an
// executor and a priority queue.
public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> {
private Instrumentation.Cron cron;
public CallableWrapper(XCallable<E> callable, long delay) {
super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
cron = new Instrumentation.Cron();
cron.start();
}
public void run() {
XCallable<?> callable = null;
try {
removeFromUniqueCallables();
if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
SAFE_MODE_DELAY);
setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
queue(this, true);
return;
}
callable = getElement();
if (callableBegin(callable)) {
cron.stop();
addInQueueCron(cron);
XLog log = XLog.getLog(getClass());
log.trace("executing callable [{0}]", callable.getName());
try {
//FutureTask.run() will invoke cllable.call()
super.run();
incrCounter(INSTR_EXECUTED_COUNTER, 1);
log.trace("executed callable [{0}]", callable.getName());
}
catch (Exception ex) {
incrCounter(INSTR_FAILED_COUNTER, 1);
log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
}
}
else {
log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
.getType(), CONCURRENCY_DELAY);
setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
queue(this, true);
incrCounter(callable.getType() + "#exceeded.concurrency", 1);
}
}
catch (Throwable t) {
incrCounter(INSTR_FAILED_COUNTER, 1);
log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(),
t.getMessage(), t);
}
finally {
if (callable != null) {
callableEnd(callable);
}
}
}
/**
* Filter the duplicate callables from the list before queue this.
* <p>
* If it is single callable, checking if key is in unique map or not.
* <p>
* If it is composite callable, remove duplicates callables from the composite.
*
* @return true if this callable should be queued
*/
public boolean filterDuplicates() {
XCallable<?> callable = getElement();
if (callable instanceof CompositeCallable) {
return ((CompositeCallable) callable).removeDuplicates();
}
else {
return uniqueCallables.containsKey(callable.getKey()) == false;
}
}
/**
* Add the keys to the set
*/
public void addToUniqueCallables() {
XCallable<?> callable = getElement();
if (callable instanceof CompositeCallable) {
((CompositeCallable) callable).addToUniqueCallables();
}
else {
((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
}
}
/**
* Remove the keys from the set
*/
public void removeFromUniqueCallables() {
XCallable<?> callable = getElement();
if (callable instanceof CompositeCallable) {
((CompositeCallable) callable).removeFromUniqueCallables();
}
else {
uniqueCallables.remove(callable.getKey());
}
}
//this will not get called, bcz newTaskFor of threadpool will convert it in futureTask which is a runnable.
// futureTask will call the cllable.call from run method. so we override run to call super.run method.
@Override
public E call() throws Exception {
return null;
}
}
class CompositeCallable implements XCallable<Void> {
private List<XCallable<?>> callables;
private String name;
private int priority;
private long createdTime;
public CompositeCallable(List<? extends XCallable<?>> callables) {
this.callables = new ArrayList<XCallable<?>>(callables);
priority = 0;
createdTime = Long.MAX_VALUE;
StringBuilder sb = new StringBuilder();
String separator = "[";
for (XCallable<?> callable : callables) {
priority = Math.max(priority, callable.getPriority());
createdTime = Math.min(createdTime, callable.getCreatedTime());
sb.append(separator).append(callable.getName());
separator = ",";
}
sb.append("]");
name = StringUtils.intern(sb.toString());
}
@Override
public String getName() {
return name;
}
@Override
public String getType() {
return "#composite#" + callables.get(0).getType();
}
@Override
public String getKey() {
return "#composite#" + callables.get(0).getKey();
}
@Override
public String getEntityKey() {
return "#composite#" + callables.get(0).getEntityKey();
}
@Override
public int getPriority() {
return priority;
}
@Override
public long getCreatedTime() {
return createdTime;
}
@Override
public void setInterruptMode(boolean mode) {
}
@Override
public boolean inInterruptMode() {
return false;
}
public List<XCallable<?>> getCallables() {
return this.callables;
}
public Void call() throws Exception {
XLog log = XLog.getLog(getClass());
for (XCallable<?> callable : callables) {
log.trace("executing callable [{0}]", callable.getName());
try {
callable.call();
incrCounter(INSTR_EXECUTED_COUNTER, 1);
log.trace("executed callable [{0}]", callable.getName());
}
catch (Exception ex) {
incrCounter(INSTR_FAILED_COUNTER, 1);
log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
}
}
// ticking -1 not to count the call to the composite callable
incrCounter(INSTR_EXECUTED_COUNTER, -1);
return null;
}
@Override
public String toString() {
if (callables.size() == 0) {
return null;
}
StringBuilder sb = new StringBuilder();
int size = callables.size();
for (int i = 0; i < size; i++) {
XCallable<?> callable = callables.get(i);
sb.append("(");
sb.append(callable.toString());
if (i + 1 == size) {
sb.append(")");
}
else {
sb.append("),");
}
}
return sb.toString();
}
/**
* Remove the duplicate callables from the list before queue them
*
* @return true if callables should be queued
*/
public boolean removeDuplicates() {
Set<String> set = new HashSet<String>();
List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
if (callables.size() == 0) {
return false;
}
for (XCallable<?> callable : callables) {
if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
filteredCallables.add(callable);
set.add(callable.getKey());
}
}
callables = filteredCallables;
if (callables.size() == 0) {
return false;
}
return true;
}
/**
* Add the keys to the set
*/
public void addToUniqueCallables() {
for (XCallable<?> callable : callables) {
((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
}
}
/**
* Remove the keys from the set
*/
public void removeFromUniqueCallables() {
for (XCallable<?> callable : callables) {
uniqueCallables.remove(callable.getKey());
}
}
}
private XLog log = XLog.getLog(getClass());
private int queueSize;
private PriorityDelayQueue<CallableWrapper<?>> queue;
private ThreadPoolExecutor executor;
private Instrumentation instrumentation;
private boolean newImpl = false;
private AsyncXCommandExecutor asyncXCommandExecutor;
/**
* Convenience method for instrumentation counters.
*
* @param name counter name.
* @param count count to increment the counter.
*/
private void incrCounter(String name, int count) {
if (instrumentation != null) {
instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
}
}
private void addInQueueCron(Instrumentation.Cron cron) {
if (instrumentation != null) {
instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
}
}
/**
* Initialize the command queue service.
*
* @param services services instance.
*/
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void init(Services services) {
Configuration conf = services.getConf();
queueSize = ConfigurationService.getInt(conf, CONF_QUEUE_SIZE);
int threads = ConfigurationService.getInt(conf, CONF_THREADS);
boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE);
interruptTypes = new HashSet<>();
for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) {
log.debug("Adding interrupt type [{0}]", type);
interruptTypes.add(type);
}
interruptTypes = ImmutableSet.copyOf(interruptTypes);
if (!callableNextEligible) {
queue = new PriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
MAX_CALLABLE_WAITTIME_MS,
TimeUnit.MILLISECONDS,
queueSize) {
@Override
protected void debug(String msgTemplate, Object... msgArgs) {
log.trace(msgTemplate, msgArgs);
}
};
}
else {
// If the head of this queue has already reached max concurrency,
// continuously find next one
// which has not yet reach max concurrency.Overrided method
// 'eligibleToPoll' to check if the
// element of this queue has reached the maximum concurrency.
queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
MAX_CALLABLE_WAITTIME_MS,
TimeUnit.MILLISECONDS,
queueSize) {
@Override
protected void debug(String msgTemplate, Object... msgArgs) {
log.trace(msgTemplate, msgArgs);
}
@Override
protected boolean eligibleToPoll(QueueElement<?> element) {
if (element != null) {
CallableWrapper wrapper = (CallableWrapper) element;
if (element.getElement() != null) {
return callableReachMaxConcurrency(wrapper.getElement());
}
}
return false;
}
};
}
interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
newImpl = ConfigurationService.getBoolean(CONF_NEWIMPL, true);
log.info("Using new queue implementation: [{0}]", newImpl);
queueAwaitTerminationTimeoutSeconds = ConfigurationService.getInt(conf, CONF_QUEUE_AWAIT_TERMINATION_TIMEOUT_SECONDS);
if (newImpl) {
int delayedCallableThreads = ConfigurationService.getInt(CONF_DELAYED_CALLABLE_THREADS, 1);
asyncXCommandExecutor = new AsyncXCommandExecutor(threads,
delayedCallableThreads,
callableNextEligible,
this,
queueSize,
MAX_CALLABLE_WAITTIME_MS,
PRIORITIES,
queueAwaitTerminationTimeoutSeconds);
executor = asyncXCommandExecutor.getExecutorService();
} else {
executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
new NamedThreadFactory("CallableQueue")) {
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t,r);
XLog.Info.get().clear();
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return (RunnableFuture<T>)callable;
}
};
}
// IMPORTANT: The ThreadPoolExecutor does not always the execute
// commands out of the queue, there are
// certain conditions where commands are pushed directly to a thread.
// As we are using a queue with DELAYED semantics (i.e. execute the
// command in 5 mins) we need to make
// sure that the commands are always pushed to the queue.
// To achieve this (by looking at the ThreadPoolExecutor.execute()
// implementation, we are making the pool
// minimum size equals to the maximum size (thus threads are keep always
// running) and we are warming up
// all those threads (the for loop that runs dummy runnables).
for (int i = 0; i < threads; i++) {
executor.execute(new Runnable() {
public void run() {
try {
Thread.sleep(100);
}
catch (InterruptedException ex) {
log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
}
}
});
}
maxCallableConcurrency = ConfigurationService.getInt(conf, CONF_CALLABLE_CONCURRENCY);
}
/**
* Destroy the command queue service.
*/
@Override
public void destroy() {
try {
queue.clear();
if (newImpl) {
asyncXCommandExecutor.shutdown();
}
else {
long limit = System.currentTimeMillis() + queueAwaitTerminationTimeoutSeconds * 1000;
executor.shutdown();
queue.clear();
while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
log.info("Waiting for executor to shutdown");
if (System.currentTimeMillis() > limit) {
log.warn("Gave up, continuing without waiting for executor to shutdown");
break;
}
}
}
}
catch (InterruptedException ex) {
log.warn(ex);
}
}
/**
* Return the public interface for command queue service.
*
* @return {@link CallableQueueService}.
*/
@Override
public Class<? extends Service> getInterface() {
return CallableQueueService.class;
}
/**
* @return int size of queue
*/
public synchronized int queueSize() {
return newImpl ? asyncXCommandExecutor.getSize() : queue.size();
}
private synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) {
if (newImpl) {
if (asyncXCommandExecutor.isShutDown() || asyncXCommandExecutor.isTerminated()) {
log.warn("Async executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
}
else {
asyncXCommandExecutor.queue(wrapper, ignoreQueueSize);
}
} else {
if (!ignoreQueueSize && queue.size() >= queueSize) {
log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
return false;
}
if (!executor.isShutdown()) {
if (wrapper.filterDuplicates()) {
wrapper.addToUniqueCallables();
try {
executor.execute(wrapper);
}
catch (Throwable ree) {
wrapper.removeFromUniqueCallables();
throw new RuntimeException(ree);
}
}
}
else {
log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
}
}
return true;
}
/**
* Queue a callable for asynchronous execution.
*
* @param callable callable to queue.
* @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
* was not queued.
*/
public boolean queue(XCallable<?> callable) {
return queue(callable, 0);
}
/**
* Queue a list of callables for serial execution.
* <p>
* Useful to serialize callables that may compete with each other for resources.
* <p>
* All callables will be processed with the priority of the highest priority of all callables.
*
* @param callables callables to be executed by the composite callable.
* @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
* were not queued.
*/
public boolean queueSerial(List<? extends XCallable<?>> callables) {
return queueSerial(callables, 0);
}
/**
* Queue a callable for asynchronous execution sometime in the future.
*
* @param callable callable to queue for delayed execution
* @param delay time, in milliseconds, that the callable should be delayed.
* @return <code>true</code> if the callable was queued, <code>false</code>
* if the queue is full and the callable was not queued.
*/
public synchronized boolean queue(XCallable<?> callable, long delay) {
if (callable == null) {
return true;
}
boolean queued = false;
if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
}
else {
checkInterruptTypes(callable);
queued = queue(new CallableWrapper<>(callable, delay), false);
if (queued) {
incrCounter(INSTR_QUEUED_COUNTER, 1);
}
else {
log.warn("Could not queue callable");
}
}
return queued;
}
/**
* Queue a list of callables for serial execution sometime in the future.
* <p>
* Useful to serialize callables that may compete with each other for resources.
* <p>
* All callables will be processed with the priority of the highest priority of all callables.
*
* @param callables callables to be executed by the composite callable.
* @param delay time, in milliseconds, that the callable should be delayed.
* @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
* were not queued.
*/
public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
boolean queued;
if (callables == null || callables.size() == 0) {
queued = true;
}
else if (callables.size() == 1) {
queued = queue(callables.get(0), delay);
}
else {
XCallable<?> callable = new CompositeCallable(callables);
queued = queue(callable, delay);
if (queued) {
incrCounter(INSTR_QUEUED_COUNTER, callables.size());
}
}
return queued;
}
/**
* Instruments the callable queue service.
*
* @param instr instance to instrument the callable queue service to.
*/
public void instrument(Instrumentation instr) {
instrumentation = instr;
instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
public Long getValue() {
return (long) queue.size();
}
});
instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
new Instrumentation.Variable<Long>() {
public Long getValue() {
return (long) executor.getActiveCount();
}
});
}
/**
* check the interrupt map for the existence of an interrupt commands if
* exist a List of Interrupt Callable for the same lock key will be returned,
* otherwise it will return null
*
* @param lockKey key to check
* @return Set of interrupt callables
*/
public Set<XCallable<?>> checkInterrupts(String lockKey) {
if (lockKey != null) {
return interruptCommandsMap.remove(lockKey);
}
return null;
}
/**
* check if the callable is of an interrupt type and insert it into the map
* accordingly
*
* @param callable callable
*/
public void checkInterruptTypes(XCallable<?> callable) {
if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
if (interruptTypes.contains(singleCallable.getType())) {
insertCallableIntoInterruptMap(singleCallable);
}
}
}
else if (interruptTypes.contains(callable.getType())) {
insertCallableIntoInterruptMap(callable);
}
}
/**
* insert a new callable in the Interrupt Command Map add a new element to
* the list or create a new list accordingly
*
* @param callable callable
*/
public void insertCallableIntoInterruptMap(XCallable<?> callable) {
if (interruptCommandsMap.size() < interruptMapMaxSize) {
Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
if (interruptSet == null) {
interruptSet = newSet;
}
if (interruptSet.add(callable)) {
log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
} else {
log.trace("Interrupt element [{0}] already present", callable.toString());
}
}
else {
log.warn(
"The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
interruptCommandsMap.size(), callable.toString());
}
}
/**
* Get the list of strings of queue dump
*
* @return the list of string that representing each CallableWrapper
*/
public List<String> getQueueDump() {
if (newImpl) {
return asyncXCommandExecutor.getQueueDump();
} else {
List<String> list = new ArrayList<String>();
for (QueueElement<CallableWrapper<?>> qe : queue) {
if (qe.toString() == null) {
continue;
}
list.add(qe.toString());
}
return list;
}
}
/**
* Get the list of strings of uniqueness map dump
*
* @return the list of string that representing the key of each command in the queue
*/
public List<String> getUniqueDump() {
List<String> list = new ArrayList<String>();
for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
list.add(entry.toString());
}
return list;
}
// Refer executor.invokeAll
public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks)
throws InterruptedException {
return executor.invokeAll(tasks);
}
public Set<String> getInterruptTypes() {
return interruptTypes;
}
}