GEODE-9713: Support thread count in ExecutorService rules (#7002)
Restores thread count support to ExecutorServiceRule, and adds it to
DistributedExecutorServiceRule.
PROBLEM
ExecutorService rules currently create a newCachedThreadPool which
creates new threads as needed.
Some usages would benefit from the option of specifying a threadCount
limit which would create a newFixedThreadPool that reuses a fixed
number of threads.
SOLUTION
Add optional threadCount creation parameter to both ExecutorServiceRule
and DistributedExecutorServiceRule.
Creating a ExecutorService rule without a threadCount will still create a
newCachedThreadPool. Using a threadCount will now create a
newFixedThreadPool.
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java
new file mode 100644
index 0000000..9328d5e
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule.builder;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+@SuppressWarnings("serial")
+public class DistributedExecutorServiceRuleLimitedThreadCountTest implements Serializable {
+
+ private static final int THREAD_COUNT = 2;
+ private static final long TIMEOUT = getTimeout().toMinutes();
+ private static final TimeUnit UNIT = TimeUnit.MINUTES;
+ private static final AtomicInteger STARTED_TASKS = new AtomicInteger();
+ private static final AtomicInteger COMPLETED_TASKS = new AtomicInteger();
+ private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>();
+
+ @Rule
+ public DistributedExecutorServiceRule executorServiceRule = builder()
+ .threadCount(THREAD_COUNT).vmCount(1).build();
+
+ @Before
+ public void setUp() {
+ Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+ STARTED_TASKS.set(0);
+ COMPLETED_TASKS.set(0);
+ LATCH.set(new CountDownLatch(1));
+ }));
+ }
+
+ @Test
+ public void limitsRunningTasksToThreadCount() {
+ // start THREAD_COUNT threads to use up the executor's thread pool
+ Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+ for (int i = 1; i <= THREAD_COUNT; i++) {
+ executorServiceRule.submit(() -> {
+ // increment count of started tasks and use a LATCH to keep it running
+ STARTED_TASKS.incrementAndGet();
+ assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
+ COMPLETED_TASKS.incrementAndGet();
+ });
+ }
+
+ // count of started tasks should be the same as THREAD_COUNT
+ await().untilAsserted(() -> {
+ assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
+ assertThat(COMPLETED_TASKS.get()).isZero();
+ });
+
+ // try to start one more task, but it should end up queued instead of started
+ executorServiceRule.submit(() -> {
+ STARTED_TASKS.incrementAndGet();
+ assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
+ COMPLETED_TASKS.incrementAndGet();
+ });
+
+ // started tasks should still be the same as THREAD_COUNT
+ assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
+ assertThat(COMPLETED_TASKS.get()).isZero();
+
+ // number of threads running in executor should also be the same as THREAD_COUNT
+ assertThat(executorServiceRule.getThreads()).hasSize(THREAD_COUNT);
+
+ // open latch to let started tasks complete, and queued task should also start and finish
+ LATCH.get().countDown();
+
+ // all tasks should eventually complete as the executor threads finish tasks
+ await().untilAsserted(() -> {
+ assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
+ assertThat(COMPLETED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
+ });
+ }));
+ }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java
new file mode 100644
index 0000000..434b409
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.dunit.VM.getVMCount;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+public class DistributedExecutorServiceRuleLimitedVmCountTest {
+
+ private static final int VM_COUNT = 2;
+
+ @Rule
+ public DistributedExecutorServiceRule executorServiceRule =
+ new DistributedExecutorServiceRule(0, VM_COUNT);
+
+ @Test
+ public void limitsVmCount() {
+ assertThat(getVMCount()).isEqualTo(VM_COUNT);
+ }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java
new file mode 100644
index 0000000..c76cafc
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+@SuppressWarnings("serial")
+public class DistributedExecutorServiceRuleUnlimitedThreadCountTest implements Serializable {
+
+ private static final int PARALLEL_TASK_COUNT = 4;
+ private static final long TIMEOUT = getTimeout().toMinutes();
+ private static final TimeUnit UNIT = TimeUnit.MINUTES;
+ private static final AtomicBoolean COMPLETED = new AtomicBoolean();
+ private static final AtomicReference<CyclicBarrier> BARRIER = new AtomicReference<>();
+
+ @Rule
+ public DistributedExecutorServiceRule executorServiceRule =
+ new DistributedExecutorServiceRule(0, 1);
+
+ @Before
+ public void setUp() {
+ Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+ COMPLETED.set(false);
+ BARRIER.set(new CyclicBarrier(PARALLEL_TASK_COUNT, () -> COMPLETED.set(true)));
+ }));
+ }
+
+ @Test
+ public void doesNotLimitThreadCount() {
+ Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+ Collection<Future<Void>> tasks = new ArrayList<>();
+ for (int i = 1; i <= PARALLEL_TASK_COUNT; i++) {
+ tasks.add(executorServiceRule.submit(() -> {
+ BARRIER.get().await(TIMEOUT, UNIT);
+ }));
+ }
+ await().untilAsserted(() -> assertThat(COMPLETED.get()).isTrue());
+ for (Future<Void> task : tasks) {
+ assertThat(task).isDone();
+ }
+ }));
+ }
+}
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
index 44f7de5..1fff402 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
@@ -14,12 +14,15 @@
*/
package org.apache.geode.test.dunit.rules;
+import static org.apache.geode.test.dunit.VM.DEFAULT_VM_COUNT;
+
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -27,17 +30,76 @@
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
import org.apache.geode.test.junit.rules.ExecutorServiceRule.ThrowingRunnable;
-@SuppressWarnings("unused")
+/**
+ * Every DUnit VM, including the JUnit Controller VM, has its own {@code ExecutorService}.
+ */
+@SuppressWarnings({"serial", "unused"})
public class DistributedExecutorServiceRule extends AbstractDistributedRule {
private static final AtomicReference<ExecutorServiceRule> delegate = new AtomicReference<>();
- public DistributedExecutorServiceRule() {
- // default vmCount
+ private final boolean enableAwaitTermination;
+ private final long awaitTerminationTimeout;
+ private final TimeUnit awaitTerminationTimeUnit;
+ private final boolean awaitTerminationBeforeShutdown;
+ private final boolean useShutdown;
+ private final boolean useShutdownNow;
+ private final int threadCount;
+
+ /**
+ * Returns a {@code Builder} to configure a new {@code ExecutorServiceRule}.
+ */
+ public static Builder builder() {
+ return new Builder();
}
- public DistributedExecutorServiceRule(int vmCount) {
+ /**
+ * Constructs a {@code DistributedExecutorServiceRule} which performs {@code shutdownNow} and
+ * thread dump of its threads if any were left running during {@code tearDown}.
+ */
+ public DistributedExecutorServiceRule() {
+ this(new Builder().threadCount(0).vmCount(DEFAULT_VM_COUNT));
+ }
+
+ /**
+ * Constructs a {@code DistributedExecutorServiceRule} which performs {@code shutdownNow} and
+ * thread dump of its threads if any were left running during {@code tearDown}.
+ *
+ * @param threadCount The number of threads in the pool. Creates fixed thread pool if > 0; else
+ * creates cached thread pool.
+ * @param vmCount specified number of VMs
+ */
+ public DistributedExecutorServiceRule(int threadCount, int vmCount) {
+ this(new Builder().threadCount(threadCount).vmCount(vmCount));
+ }
+
+ private DistributedExecutorServiceRule(Builder builder) {
+ this(builder.enableAwaitTermination,
+ builder.awaitTerminationTimeout,
+ builder.awaitTerminationTimeUnit,
+ builder.awaitTerminationBeforeShutdown,
+ builder.useShutdown,
+ builder.useShutdownNow,
+ builder.threadCount,
+ builder.vmCount);
+ }
+
+ private DistributedExecutorServiceRule(boolean enableAwaitTermination,
+ long awaitTerminationTimeout,
+ TimeUnit awaitTerminationTimeUnit,
+ boolean awaitTerminationBeforeShutdown,
+ boolean useShutdown,
+ boolean useShutdownNow,
+ int threadCount,
+ int vmCount) {
super(vmCount);
+ this.enableAwaitTermination = enableAwaitTermination;
+ this.awaitTerminationTimeout = awaitTerminationTimeout;
+ this.awaitTerminationTimeUnit = awaitTerminationTimeUnit;
+ this.awaitTerminationBeforeShutdown = awaitTerminationBeforeShutdown;
+ this.useShutdown = useShutdown;
+ this.useShutdownNow = useShutdownNow;
+ this.threadCount = threadCount;
}
public ExecutorService getExecutorService() {
@@ -173,7 +235,9 @@
private void invokeBefore() throws Exception {
try {
- delegate.set(new ExecutorServiceRule());
+ delegate.set(new ExecutorServiceRule(enableAwaitTermination, awaitTerminationTimeout,
+ awaitTerminationTimeUnit, awaitTerminationBeforeShutdown, useShutdown, useShutdownNow,
+ threadCount));
delegate.get().before();
} catch (Throwable throwable) {
if (throwable instanceof Exception) {
@@ -186,4 +250,97 @@
private void invokeAfter() {
delegate.get().after();
}
+
+ public static class Builder {
+
+ private boolean enableAwaitTermination;
+ private long awaitTerminationTimeout;
+ private TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
+ private boolean awaitTerminationBeforeShutdown = true;
+ private boolean useShutdown;
+ private boolean useShutdownNow = true;
+ private int threadCount;
+ private int vmCount;
+
+ protected Builder() {
+ // nothing
+ }
+
+ /**
+ * Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ */
+ public Builder awaitTermination(long timeout, TimeUnit unit) {
+ enableAwaitTermination = true;
+ awaitTerminationTimeout = timeout;
+ awaitTerminationTimeUnit = unit;
+ return this;
+ }
+
+ /**
+ * Enables invocation of {@code shutdown} during {@code tearDown}. Default is disabled.
+ */
+ public Builder useShutdown() {
+ useShutdown = true;
+ useShutdownNow = false;
+ return this;
+ }
+
+ /**
+ * Enables invocation of {@code shutdownNow} during {@code tearDown}. Default is enabled.
+ */
+ public Builder useShutdownNow() {
+ useShutdown = false;
+ useShutdownNow = true;
+ return this;
+ }
+
+ /**
+ * Specifies invocation of {@code awaitTermination} before {@code shutdown} or
+ * {@code shutdownNow}.
+ */
+ public Builder awaitTerminationBeforeShutdown() {
+ awaitTerminationBeforeShutdown = true;
+ return this;
+ }
+
+ /**
+ * Specifies invocation of {@code awaitTermination} after {@code shutdown} or
+ * {@code shutdownNow}.
+ */
+ public Builder awaitTerminationAfterShutdown() {
+ awaitTerminationBeforeShutdown = false;
+ return this;
+ }
+
+ /**
+ * Specifies the number of threads in the pool. Creates fixed thread pool if > 0. Default is 0
+ * which means (non-fixed) cached thread pool.
+ *
+ * @param threadCount the number of threads in the pool
+ */
+ public Builder threadCount(int threadCount) {
+ this.threadCount = threadCount;
+ return this;
+ }
+
+ /**
+ * Specifies the number of DUnit VMs to startup.
+ *
+ * @param vmCount the number of DUnit VMs to startup
+ */
+ public Builder vmCount(int vmCount) {
+ this.vmCount = vmCount;
+ return this;
+ }
+
+ /**
+ * Builds the instance of {@code ExecutorServiceRule}.
+ */
+ public DistributedExecutorServiceRule build() {
+ return new DistributedExecutorServiceRule(this);
+ }
+ }
}
diff --git a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
index ff871f5..0221781 100644
--- a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
+++ b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
@@ -164,7 +164,7 @@
org/apache/geode/test/dunit/rules/DistributedDiskDirRule,false,invoker:org/apache/geode/test/dunit/rules/RemoteInvoker,temporaryFolder:org/apache/geode/test/junit/rules/serializable/SerializableTemporaryFolder,testClassName:java/lang/String,testName:org/apache/geode/test/junit/rules/serializable/SerializableTestName,vmCount:int,vmEventListener:org/apache/geode/test/dunit/VMEventListener
org/apache/geode/test/dunit/rules/DistributedDiskDirRule$InternalVMEventListener,false,this$0:org/apache/geode/test/dunit/rules/DistributedDiskDirRule
org/apache/geode/test/dunit/rules/DistributedErrorCollector,false,beforeBounceErrors:java/util/Map
-org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule,false
+org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,threadCount:int,useShutdown:boolean,useShutdownNow:boolean
org/apache/geode/test/dunit/rules/DistributedExternalResource,false,invoker:org/apache/geode/test/dunit/rules/RemoteInvoker
org/apache/geode/test/dunit/rules/DistributedMap,false,controller:java/util/concurrent/atomic/AtomicReference,identity:int,initialEntries:java/util/Map
org/apache/geode/test/dunit/rules/DistributedReference,false,autoClose:java/util/concurrent/atomic/AtomicBoolean,identity:int
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
index e3a6a67..abc9e42 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -94,6 +94,7 @@
protected final boolean awaitTerminationBeforeShutdown;
protected final boolean useShutdown;
protected final boolean useShutdownNow;
+ protected final int threadCount;
protected transient volatile DedicatedThreadFactory threadFactory;
protected transient volatile ExecutorService executor;
@@ -106,12 +107,13 @@
}
protected ExecutorServiceRule(Builder builder) {
- enableAwaitTermination = builder.enableAwaitTermination;
- awaitTerminationTimeout = builder.awaitTerminationTimeout;
- awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
- awaitTerminationBeforeShutdown = builder.awaitTerminationBeforeShutdown;
- useShutdown = builder.useShutdown;
- useShutdownNow = builder.useShutdownNow;
+ this(builder.enableAwaitTermination,
+ builder.awaitTerminationTimeout,
+ builder.awaitTerminationTimeUnit,
+ builder.awaitTerminationBeforeShutdown,
+ builder.useShutdown,
+ builder.useShutdownNow,
+ builder.threadCount);
}
/**
@@ -119,18 +121,47 @@
* during {@code tearDown}.
*/
public ExecutorServiceRule() {
- enableAwaitTermination = false;
- awaitTerminationTimeout = 0;
- awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
- awaitTerminationBeforeShutdown = false;
- useShutdown = false;
- useShutdownNow = true;
+ this(false, 0, TimeUnit.NANOSECONDS, false, false, true, 0);
+ }
+
+ /**
+ * Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
+ * during {@code tearDown}.
+ *
+ * @param threadCount The number of threads in the pool. Creates fixed thread pool if > 0; else
+ * creates cached thread pool.
+ */
+ public ExecutorServiceRule(int threadCount) {
+ this(false, 0, TimeUnit.NANOSECONDS, false, false, true, threadCount);
+ }
+
+ /**
+ * For invocation by {@code DistributedExecutorServiceRule} which needs to subclass another class.
+ */
+ public ExecutorServiceRule(boolean enableAwaitTermination,
+ long awaitTerminationTimeout,
+ TimeUnit awaitTerminationTimeUnit,
+ boolean awaitTerminationBeforeShutdown,
+ boolean useShutdown,
+ boolean useShutdownNow,
+ int threadCount) {
+ this.enableAwaitTermination = enableAwaitTermination;
+ this.awaitTerminationTimeout = awaitTerminationTimeout;
+ this.awaitTerminationTimeUnit = awaitTerminationTimeUnit;
+ this.awaitTerminationBeforeShutdown = awaitTerminationBeforeShutdown;
+ this.useShutdown = useShutdown;
+ this.useShutdownNow = useShutdownNow;
+ this.threadCount = threadCount;
}
@Override
public void before() {
threadFactory = new DedicatedThreadFactory();
- executor = Executors.newCachedThreadPool(threadFactory);
+ if (threadCount > 0) {
+ executor = Executors.newFixedThreadPool(threadCount, threadFactory);
+ } else {
+ executor = Executors.newCachedThreadPool(threadFactory);
+ }
}
@Override
@@ -375,6 +406,7 @@
protected boolean awaitTerminationBeforeShutdown = true;
protected boolean useShutdown;
protected boolean useShutdownNow = true;
+ protected int threadCount;
protected Builder() {
// nothing
@@ -430,6 +462,17 @@
}
/**
+ * Specifies the number of threads in the pool. Creates fixed thread pool if > 0. Default is 0
+ * which means (non-fixed) cached thread pool.
+ *
+ * @param threadCount the number of threads in the pool
+ */
+ public Builder threadCount(int threadCount) {
+ this.threadCount = threadCount;
+ return this;
+ }
+
+ /**
* Builds the instance of {@code ExecutorServiceRule}.
*/
public ExecutorServiceRule build() {
diff --git a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
index d286fc1..5229b19 100644
--- a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
+++ b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
@@ -76,7 +76,7 @@
org/apache/geode/test/concurrent/FileBasedCountDownLatch,false,dataFile:java/io/File,lockFile:java/io/File
org/apache/geode/test/junit/rules/CloseableReference,false,autoClose:java/util/concurrent/atomic/AtomicBoolean
org/apache/geode/test/junit/rules/ConditionalIgnoreRule,false
-org/apache/geode/test/junit/rules/ExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,useShutdown:boolean,useShutdownNow:boolean
+org/apache/geode/test/junit/rules/ExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,threadCount:int,useShutdown:boolean,useShutdownNow:boolean
org/apache/geode/test/junit/rules/IgnoreOnWindowsRule,false
org/apache/geode/test/junit/rules/IgnoreUntilRule,false
org/apache/geode/test/junit/rules/JarFileRule,false,className:java/lang/String,jarFile:java/io/File,jarName:java/lang/String,makeJarLarge:boolean