GEODE-10380: use waitingThreadPool to notify dispatcher at re_auth (#7801)


(cherry picked from commit b3fef2a9989ecb5897325a7a84377a8ac7d30028)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index bf57daa..57ffcfc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -727,7 +728,11 @@
     if (_messageDispatcher == null) {
       return;
     }
-    _messageDispatcher.notifyReAuthentication();
+
+    // use another thread to do the notification so that the server operation won't be blocked
+    ExecutorService threadPool =
+        _cache.getDistributionManager().getExecutors().getWaitingThreadPool();
+    threadPool.submit(() -> _messageDispatcher.notifyReAuthentication());
   }
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index 3b4c63f..9b150be 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -20,7 +20,9 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -30,12 +32,18 @@
 
 import java.net.InetAddress;
 import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.shiro.subject.Subject;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.OperationExecutors;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.CacheClientProxyStatsFactory;
@@ -43,6 +51,7 @@
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class CacheClientProxyTest {
   private CacheClientProxy proxyWithSingleUser;
@@ -71,6 +80,7 @@
     when(socket.getInetAddress()).thenReturn(inetAddress);
     when(notifier.getAcceptorStats()).thenReturn(stats);
     id = mock(ClientProxyMembershipID.class);
+    when(id.getDurableId()).thenReturn("proxy_id");
     version = KnownVersion.TEST_VERSION;
     securityService = mock(SecurityService.class);
     subject = mock(Subject.class);
@@ -175,4 +185,37 @@
     verify(subject, never()).logout();
     verify(clientUserAuths, times(1)).cleanup(anyBoolean());
   }
+
+  @Rule
+  public ExecutorServiceRule executorService = new ExecutorServiceRule();
+
+  @Test
+  public void notifyReAuthenticationIsNotBlocked() {
+    CacheClientProxy spy = spy(proxyWithSingleUser);
+    MessageDispatcher dispatcher = mock(MessageDispatcher.class);
+    doReturn(dispatcher).when(spy).createMessageDispatcher(any());
+    spy.initializeMessageDispatcher();
+    DistributionManager manager = mock(DistributionManager.class);
+    OperationExecutors executors = mock(OperationExecutors.class);
+    ExecutorService executor = executorService.getExecutorService();
+    when(cache.getDistributionManager()).thenReturn(manager);
+    when(manager.getExecutors()).thenReturn(executors);
+    when(executors.getWaitingThreadPool()).thenReturn(executor);
+
+    AtomicBoolean updated = new AtomicBoolean(false);
+
+    // simulating a blocked message dispatcher when notify reauth
+    doAnswer((Answer<Void>) invocation -> {
+      while (!updated.get()) {
+        Thread.sleep(200);
+      }
+      return null;
+    }).when(dispatcher).notifyReAuthentication();
+
+    // proxy.notifyReauthentication won't be blocked
+    spy.notifyReAuthentication();
+    assertThat(updated.get()).isFalse();
+  }
+
+
 }