GEODE-10380: use waitingThreadPool to notify dispatcher at re_auth (#7801)
(cherry picked from commit b3fef2a9989ecb5897325a7a84377a8ac7d30028)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index bf57daa..57ffcfc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -727,7 +728,11 @@
if (_messageDispatcher == null) {
return;
}
- _messageDispatcher.notifyReAuthentication();
+
+ // use another thread to do the notification so that the server operation won't be blocked
+ ExecutorService threadPool =
+ _cache.getDistributionManager().getExecutors().getWaitingThreadPool();
+ threadPool.submit(() -> _messageDispatcher.notifyReAuthentication());
}
/**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index 3b4c63f..9b150be 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -20,7 +20,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -30,12 +32,18 @@
import java.net.InetAddress;
import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.shiro.subject.Subject;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.mockito.stubbing.Answer;
import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.CacheClientProxyStatsFactory;
@@ -43,6 +51,7 @@
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class CacheClientProxyTest {
private CacheClientProxy proxyWithSingleUser;
@@ -71,6 +80,7 @@
when(socket.getInetAddress()).thenReturn(inetAddress);
when(notifier.getAcceptorStats()).thenReturn(stats);
id = mock(ClientProxyMembershipID.class);
+ when(id.getDurableId()).thenReturn("proxy_id");
version = KnownVersion.TEST_VERSION;
securityService = mock(SecurityService.class);
subject = mock(Subject.class);
@@ -175,4 +185,37 @@
verify(subject, never()).logout();
verify(clientUserAuths, times(1)).cleanup(anyBoolean());
}
+
+ @Rule
+ public ExecutorServiceRule executorService = new ExecutorServiceRule();
+
+ @Test
+ public void notifyReAuthenticationIsNotBlocked() {
+ CacheClientProxy spy = spy(proxyWithSingleUser);
+ MessageDispatcher dispatcher = mock(MessageDispatcher.class);
+ doReturn(dispatcher).when(spy).createMessageDispatcher(any());
+ spy.initializeMessageDispatcher();
+ DistributionManager manager = mock(DistributionManager.class);
+ OperationExecutors executors = mock(OperationExecutors.class);
+ ExecutorService executor = executorService.getExecutorService();
+ when(cache.getDistributionManager()).thenReturn(manager);
+ when(manager.getExecutors()).thenReturn(executors);
+ when(executors.getWaitingThreadPool()).thenReturn(executor);
+
+ AtomicBoolean updated = new AtomicBoolean(false);
+
+ // simulating a blocked message dispatcher when notify reauth
+ doAnswer((Answer<Void>) invocation -> {
+ while (!updated.get()) {
+ Thread.sleep(200);
+ }
+ return null;
+ }).when(dispatcher).notifyReAuthentication();
+
+ // proxy.notifyReauthentication won't be blocked
+ spy.notifyReAuthentication();
+ assertThat(updated.get()).isFalse();
+ }
+
+
}