[KARAF-4610] Use a Thread ExecutorService in the decanter scheduler
diff --git a/scheduler/simple/src/main/cfg/org.apache.karaf.decanter.scheduler.simple.cfg b/scheduler/simple/src/main/cfg/org.apache.karaf.decanter.scheduler.simple.cfg
index a9b270e..9d554da 100644
--- a/scheduler/simple/src/main/cfg/org.apache.karaf.decanter.scheduler.simple.cfg
+++ b/scheduler/simple/src/main/cfg/org.apache.karaf.decanter.scheduler.simple.cfg
@@ -1,2 +1,16 @@
# Define the Decanter simple scheduler period
-period=5000
\ No newline at end of file
+period=5000
+
+# scheduler thread pool
+
+# The time to wait before stopping an idle thread (in millesecond)
+# Default is 1 minute
+threadIdleTimeout=60000
+
+# Initial number of threads created by the scheduler
+# Default is 5
+threadInitCount=5
+
+# Maximum number of threads created by the scheduler
+# Default is 200
+threadMaxCount=200
\ No newline at end of file
diff --git a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
index 0268264..22e35b9 100644
--- a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
+++ b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
@@ -19,6 +19,7 @@
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.karaf.decanter.api.Scheduler;
@@ -42,7 +43,14 @@
private final static Logger LOGGER = LoggerFactory.getLogger(SimpleScheduler.class);
private AtomicBoolean running = new AtomicBoolean(false);
+
private long period = 5000;
+ private long threadIdleTimeout = 60000;
+ private int threadInitCount = 5;
+ private int threadMaxCount = 200;
+
+ private ExecutorService executorService;
+
Set<Runnable> collectors;
public SimpleScheduler() {
@@ -59,6 +67,12 @@
public void configure(Dictionary<String, String> config) {
String periodSt = config.get("period");
period = periodSt != null ? Integer.parseInt(periodSt) : 5000;
+ String threadIdleTimeoutSt = config.get("threadIdleTimeout");
+ threadIdleTimeout = threadIdleTimeoutSt != null ? Integer.parseInt(threadIdleTimeoutSt) : 60000;
+ String threadInitCountSt = config.get("threadInitCount");
+ threadInitCount = threadInitCountSt != null ? Integer.parseInt(threadInitCountSt) : 5;
+ String threadMaxCountSt = config.get("threadMaxCount");
+ threadMaxCount = threadMaxCountSt != null ? Integer.parseInt(threadMaxCountSt) : 200;
}
@Override
@@ -69,7 +83,7 @@
LOGGER.debug("Calling the collectors ...");
for (Runnable collector : collectors) {
try {
- collector.run();
+ executorService.execute(collector);
} catch (Exception e) {
LOGGER.warn("Can't collect data", e);
}
@@ -91,12 +105,22 @@
@Override
public void stop() {
+ try {
+ executorService.awaitTermination(60L, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Collectors still active", e);
+ }
+ executorService.shutdownNow();
running.set(false);
}
@Override
public void start() {
if (running.compareAndSet(false, true)) {
+ executorService = new ThreadPoolExecutor(threadInitCount,
+ threadMaxCount, threadIdleTimeout, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
Thread thread = new Thread(this, "decanter-scheduler-simple");
thread.start();
}