Add custom rejection handler that implements 'caller runs' which includes logging.
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index e100ec0..3c6a750 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,11 +20,10 @@
package org.apache.usergrid.persistence.core.executor;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,6 +32,8 @@
*/
public class TaskExecutorFactory {
+ private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class);
+
public enum RejectionAction {
ABORT,
CALLERRUNS
@@ -84,7 +85,7 @@
public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
- new CountingThreadFactory( poolName ), new ThreadPoolExecutor.CallerRunsPolicy() );
+ new CountingThreadFactory( poolName ), new RejectedHandler(poolName) );
}
}
@@ -110,9 +111,29 @@
Thread t = new Thread( r, threadName );
//set it to be a daemon thread so it doesn't block shutdown
- t.setDaemon( true );
+ t.setDaemon(true);
return t;
}
}
+
+ /**
+ * The handler that will handle rejected executions and signal the interface
+ */
+ private static final class RejectedHandler implements RejectedExecutionHandler {
+
+ private final String poolName;
+
+ private RejectedHandler (final String poolName) {this.poolName = poolName;}
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+ log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
+
+ //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
+
+ r.run();
+ }
+
+ }
}