GEODE-8683: Honor maximum-time-between-pings in gateway receiver (#5701)

The maximum-time-between-pings set when creating a gateway receiver
was not honored because the ClientHealthMonitor, that is the singleton
class monitoring all clients supported just one value for maximum
time between pings for all clients. This value is set
when the server in which the receiver is running is started
and when the gateway receiver provides a different value
it is ignored.

With this fix, it is allowed to have different values
for maximum-time-between-clients for different clients
as the value is not taken from the ClientHealthMonitor.
Each client can have its own value.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index d53541c..2982549 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -105,4 +105,6 @@
   void unregisterServerConnection(ServerConnection serverConnection);
 
   void decClientServerConnectionCount();
+
+  int getMaximumTimeBetweenPings();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index c2d9615..1dd4cb7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -122,6 +122,7 @@
   private final CacheServerStats stats;
   private final int maxConnections;
   private final int maxThreads;
+  private final int maximumTimeBetweenPings;
 
   private final ExecutorService pool;
   /**
@@ -636,6 +637,7 @@
     this.socketBufferSize = socketBufferSize;
 
     // Create the singleton ClientHealthMonitor
+    this.maximumTimeBetweenPings = maximumTimeBetweenPings;
     healthMonitor = clientHealthMonitorProvider.get(internalCache, maximumTimeBetweenPings,
         clientNotifier.getStats());
 
@@ -1865,6 +1867,11 @@
     releaseCommBuffer(Message.setTLCommBuffer(null));
   }
 
+  @Override
+  public int getMaximumTimeBetweenPings() {
+    return maximumTimeBetweenPings;
+  }
+
   private static class ClientQueueInitializerTask implements Runnable {
     private final Socket socket;
     private final boolean isPrimaryServerToClient;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 29dd180..45e6cc4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -15,6 +15,7 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
+
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -69,6 +70,12 @@
       new ConcurrentHashMap<>();
 
   /**
+   * The map of known clients and maximum time between pings.
+   */
+  private ConcurrentMap<ClientProxyMembershipID, Integer> clientMaximumTimeBetweenPings =
+      new ConcurrentHashMap<>();
+
+  /**
    * THe GemFire <code>Cache</code>
    */
   private final InternalCache cache;
@@ -77,7 +84,7 @@
     return maximumTimeBetweenPings;
   }
 
-  private final int maximumTimeBetweenPings;
+  private volatile int maximumTimeBetweenPings;
 
   /**
    * A thread that validates client connections
@@ -191,12 +198,19 @@
     refCount = 0;
   }
 
+  public void registerClient(ClientProxyMembershipID proxyID) {
+    registerClient(proxyID, maximumTimeBetweenPings);
+  }
+
   /**
    * Registers a new client to be monitored.
    *
    * @param proxyID The id of the client to be registered
    */
-  public void registerClient(ClientProxyMembershipID proxyID) {
+  public void registerClient(ClientProxyMembershipID proxyID, int maximumTimeBetweenPings) {
+    if (!clientMaximumTimeBetweenPings.containsKey(proxyID)) {
+      clientMaximumTimeBetweenPings.putIfAbsent(proxyID, maximumTimeBetweenPings);
+    }
     if (!clientHeartbeats.containsKey(proxyID)) {
       if (null == clientHeartbeats.putIfAbsent(proxyID,
           new AtomicLong(System.currentTimeMillis()))) {
@@ -233,6 +247,7 @@
       }
       expireTXStates(proxyID);
     }
+    clientMaximumTimeBetweenPings.remove(proxyID);
   }
 
   /**
@@ -349,7 +364,7 @@
 
     AtomicLong heartbeat = clientHeartbeats.get(proxyID);
     if (null == heartbeat) {
-      registerClient(proxyID);
+      registerClient(proxyID, getMaximumTimeBetweenPings(proxyID));
     } else {
       heartbeat.set(System.currentTimeMillis());
     }
@@ -581,6 +596,7 @@
    *
    * @param cache The GemFire <code>Cache</code>
    * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
+   *        client has died and interrupting its sockets
    */
   protected static synchronized void createInstance(InternalCache cache,
       int maximumTimeBetweenPings, CacheClientNotifierStats stats) {
@@ -597,6 +613,7 @@
    *
    * @param cache The GemFire <code>Cache</code>
    * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
+   *        client has died and interrupting its sockets
    */
   private ClientHealthMonitor(InternalCache cache, int maximumTimeBetweenPings,
       CacheClientNotifierStats stats) {
@@ -660,6 +677,10 @@
     return getNumberOfClientsAtOrAboveVersion(KnownVersion.GFE_61) > 0;
   }
 
+  private int getMaximumTimeBetweenPings(ClientProxyMembershipID proxyID) {
+    return clientMaximumTimeBetweenPings.getOrDefault(proxyID, maximumTimeBetweenPings);
+  }
+
   /**
    * Interface for changing the heartbeat timeout behavior in the ClientHealthMonitorThread, should
    * only be used for testing
@@ -786,8 +807,9 @@
                     (currentTime - latestHeartbeat), proxyID);
               }
 
+              int maximumTimeBetweenPingsForClient = getMaximumTimeBetweenPings(proxyID);
               if (checkHeartbeat.timedOut(currentTime, latestHeartbeat,
-                  _maximumTimeBetweenPings)) {
+                  maximumTimeBetweenPingsForClient)) {
                 // This client has been idle for too long. Determine whether
                 // any of its ServerConnection threads are currently processing
                 // a message. If so, let it go. If not, disconnect it.
@@ -795,7 +817,8 @@
                   if (cleanupClientThreads(proxyID, true)) {
                     logger.warn(
                         "Monitoring client with member id {}. It had been {} ms since the latest heartbeat. Max interval is {}. Terminated client.",
-                        entry.getKey(), currentTime - latestHeartbeat, _maximumTimeBetweenPings);
+                        entry.getKey(), currentTime - latestHeartbeat,
+                        maximumTimeBetweenPingsForClient);
                   }
                 } else {
                   if (logger.isDebugEnabled()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 471bba4..4f1553f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -634,7 +634,7 @@
         chmRegistered = true;
       }
       if (registerClient) {
-        chm.registerClient(proxyId);
+        chm.registerClient(proxyId, acceptor.getMaximumTimeBetweenPings());
       }
       serverConnectionCollection = chm.addConnection(proxyId, this);
       acceptor.getConnectionListener().connectionOpened(registerClient, communicationMode);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java
index c2a35ea..043b985 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java
@@ -63,6 +63,7 @@
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
     ServerConnection mockConnection = mock(ServerConnection.class);
 
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.addConnection(mockId, mockConnection);
     clientHealthMonitor.receivedPing(mockId);
     clientHealthMonitor.testUseCustomHeartbeatCheck((a, b, c) -> true); // Fail all heartbeats
@@ -86,6 +87,7 @@
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
     ServerConnection mockConnection = mock(ServerConnection.class);
 
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.addConnection(mockId, mockConnection);
     clientHealthMonitor.receivedPing(mockId);
 
@@ -149,6 +151,7 @@
   @Test
   public void receivedPingNewClientRegistersWithCurrentTimeAndIncrementsStat() {
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.receivedPing(mockId);
     assertThat(clientHealthMonitor.getClientHeartbeats().get(mockId)).isNotNull()
         .isLessThanOrEqualTo(System.currentTimeMillis());
@@ -158,6 +161,7 @@
   @Test
   public void receivedPingExistingClientUpdatesTimeOnly() throws InterruptedException {
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.receivedPing(mockId);
     Long expectedTime = clientHealthMonitor.getClientHeartbeats().get(mockId);
 
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 8446885..620b50c 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -142,6 +142,7 @@
 import org.apache.geode.internal.cache.InternalCacheBuilder;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PoolStats;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.execute.data.CustId;
 import org.apache.geode.internal.cache.execute.data.Customer;
@@ -1254,6 +1255,16 @@
     return stats;
   }
 
+  public static int getGatewaySenderPoolDisconnects(String senderId) {
+    AbstractGatewaySender sender =
+        (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);
+    assertNotNull(sender);
+
+    PoolStats poolStats = sender.getProxy().getStats();
+
+    return poolStats.getDisConnects();
+  }
+
   protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) {
     int size = 0;
     if (prQ != null) {
@@ -2053,18 +2064,30 @@
     }
   }
 
-  public static void createReceiverInVMs(VM... vms) {
+  public static void createReceiverInVMs(int maximumTimeBetweenPings, VM... vms) {
     for (VM vm : vms) {
-      vm.invoke(() -> createReceiver());
+      vm.invoke(() -> createReceiverWithMaximumTimeBetweenPings(maximumTimeBetweenPings));
     }
   }
 
+
+  public static void createReceiverInVMs(VM... vms) {
+    createReceiverInVMs(-1, vms);
+  }
+
   public static int createReceiver() {
+    return createReceiverWithMaximumTimeBetweenPings(-1);
+  }
+
+  public static int createReceiverWithMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
     int port = AvailablePortHelper.getRandomAvailableTCPPort();
     fact.setStartPort(port);
     fact.setEndPort(port);
     fact.setManualStart(true);
+    if (maximumTimeBetweenPings > 0) {
+      fact.setMaximumTimeBetweenPings(maximumTimeBetweenPings);
+    }
     GatewayReceiver receiver = fact.create();
     try {
       receiver.start();
@@ -2149,6 +2172,10 @@
   }
 
   public static int createServer(int locPort) {
+    return createServer(locPort, -1);
+  }
+
+  public static int createServer(int locPort, int maximumTimeBetweenPings) {
     WANTestBase test = new WANTestBase();
     Properties props = test.getDistributedSystemProperties();
     props.setProperty(MCAST_PORT, "0");
@@ -2160,6 +2187,9 @@
     int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
     server.setPort(port);
     server.setHostnameForClients("localhost");
+    if (maximumTimeBetweenPings > 0) {
+      server.setMaximumTimeBetweenPings(maximumTimeBetweenPings);
+    }
     try {
       server.start();
     } catch (IOException e) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 7ff1c8d..d41dc6a 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -17,6 +17,7 @@
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
@@ -28,6 +29,10 @@
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.internal.cache.execute.data.CustId;
 import org.apache.geode.internal.cache.execute.data.Customer;
 import org.apache.geode.internal.cache.execute.data.Order;
@@ -1095,7 +1100,54 @@
 
   }
 
+  @Test
+  public void testMaximumTimeBetweenPingsInGatewayReceiverIsHonored() {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
+    vm2.invoke(() -> createServer(nyPort));
+
+    // Set a maximum time between pings lower than the time between pings sent at the sender (5000)
+    // so that the receiver will not receive the ping on time and will close the connection.
+    int maximumTimeBetweenPingsInGatewayReceiver = 4000;
+    createReceiverInVMs(maximumTimeBetweenPingsInGatewayReceiver, vm2);
+    createReceiverPR(vm2, 0);
+
+    int maximumTimeBetweenPingsInServer = 60000;
+    vm4.invoke(() -> createServer(lnPort, maximumTimeBetweenPingsInServer));
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
+    createSenderPRInVM(0, vm4);
+    String senderId = "ln";
+    startSenderInVMs(senderId, vm4);
+
+    // Send some puts to start the connections from the sender to the receiver
+    vm4.invoke(() -> WANTestBase.doPuts(testName, 2));
+
+    // Create client to check if connections are later closed.
+    ClientCache clientCache = new ClientCacheFactory()
+        .addPoolLocator("localhost", lnPort)
+        .setPoolLoadConditioningInterval(-1)
+        .setPoolIdleTimeout(-1)
+        .setPoolPingInterval(5000)
+        .create();
+    Region clientRegion =
+        clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
+            .create(testName);
+    for (long i = 0; i < 2; i++) {
+      clientRegion.put(i, "Value_" + i);
+    }
+
+    // Wait more than maximum-time-between-pings in the gateway receiver so that connections are
+    // closed.
+    try {
+      Thread.sleep(maximumTimeBetweenPingsInGatewayReceiver + 2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    assertNotEquals(0, (int) vm4.invoke(() -> getGatewaySenderPoolDisconnects(senderId)));
+    assertEquals(0, ((PoolImpl) clientCache.getDefaultPool()).getStats().getDisConnects());
+  }
 
   protected Map putKeyValues() {
     final Map keyValues = new HashMap();