GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (#7424)
* GEODE-10068: Make WanCopyRegionFunctionService thread pool configurable through property
* GEODE-10068: Change name of property and add test case
* GEODE-10068: Update after more review comments
diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
index 01bdb67..3efb25a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
@@ -114,6 +114,14 @@
public static final String RE_AUTHENTICATE_WAIT_TIME = "reauthenticate.wait.time";
/**
+ * Maximum number of concurrent executions in a server of wan-copy region commands.
+ * Once the maximum number is reached, subsequent executions will be halted until
+ * a thread for any of the ongoing executions is released.
+ */
+ public static final String WAN_COPY_REGION_MAX_CONCURRENT_THREADS =
+ "geode.wan.copy-region.max-threads";
+
+ /**
* As of Geode 1.4.0, a region set operation will be in a transaction even if it is the first
* operation in the transaction.
*
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
index 79000b4..a8592a3 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
@@ -25,12 +25,17 @@
import org.apache.geode.cache.Cache;
import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.lang.SystemProperty;
+import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
import org.apache.geode.management.internal.functions.CliFunctionResult;
public class WanCopyRegionFunctionService implements CacheService {
+ private static final String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
+ "WAN Copy Region Function Execution Processor";
+
private volatile ExecutorService wanCopyRegionFunctionExecutionPool;
/**
@@ -41,11 +46,15 @@
@Override
public boolean init(Cache cache) {
- String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
- "WAN Copy Region Function Execution Processor";
- int WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS = 10;
+ int maxConcurrentThreads = SystemProperty
+ .getProductIntegerProperty(
+ SystemPropertyHelper.WAN_COPY_REGION_MAX_CONCURRENT_THREADS, 10);
+ return init(maxConcurrentThreads);
+ }
+
+ boolean init(int maxConcurrentThreads) {
wanCopyRegionFunctionExecutionPool = LoggingExecutors
- .newFixedThreadPool(WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS,
+ .newFixedThreadPool(maxConcurrentThreads,
WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX, true);
return true;
}
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
index 864aa63..1b5a898 100644
--- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
@@ -23,9 +23,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.internal.functions.CliFunctionResult;
@@ -36,7 +37,7 @@
private WanCopyRegionFunctionService service;
private final InternalCache cache = mock(InternalCache.class);
- @Before
+ @BeforeEach
public void setUp() throws Exception {
service = new WanCopyRegionFunctionService();
service.init(cache);
@@ -158,7 +159,7 @@
int executions = 5;
CountDownLatch latch = new CountDownLatch(executions);
for (int i = 0; i < executions; i++) {
- Callable<CliFunctionResult> firstExecution = () -> {
+ Callable<CliFunctionResult> execution = () -> {
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
return null;
};
@@ -167,7 +168,7 @@
CompletableFuture
.supplyAsync(() -> {
try {
- return service.execute(firstExecution, regionName, "mySender1");
+ return service.execute(execution, regionName, "mySender1");
} catch (Exception e) {
return null;
}
@@ -183,4 +184,47 @@
latch.countDown();
}
}
+
+ @Test
+ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() {
+ int maxConcurrentExecutions = 2;
+ service.init(maxConcurrentExecutions);
+
+ int executions = 4;
+ CountDownLatch latch = new CountDownLatch(executions);
+ AtomicInteger concurrentExecutions = new AtomicInteger(0);
+ for (int i = 0; i < executions; i++) {
+ Callable<CliFunctionResult> execution = () -> {
+ concurrentExecutions.incrementAndGet();
+ latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ concurrentExecutions.decrementAndGet();
+ return null;
+ };
+
+ final String regionName = String.valueOf(i);
+ CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ return service.execute(execution, regionName, "mySender1");
+ } catch (Exception e) {
+ return null;
+ }
+ });
+ }
+
+ // Wait for the functions to start execution
+ await().untilAsserted(
+ () -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
+
+ // Make sure concurrent executions does not exceed the maximum
+ assertThat(concurrentExecutions.get()).isEqualTo(maxConcurrentExecutions);
+
+ // End executions
+ for (int i = 0; i < executions; i++) {
+ latch.countDown();
+ }
+
+ await().untilAsserted(() -> assertThat(concurrentExecutions.get()).isEqualTo(0));
+ }
+
}