diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index d53541c..2982549 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -105,4 +105,6 @@
   void unregisterServerConnection(ServerConnection serverConnection);
 
   void decClientServerConnectionCount();
+
+  int getMaximumTimeBetweenPings();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index c2d9615..1dd4cb7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -122,6 +122,7 @@
   private final CacheServerStats stats;
   private final int maxConnections;
   private final int maxThreads;
+  private final int maximumTimeBetweenPings;
 
   private final ExecutorService pool;
   /**
@@ -636,6 +637,7 @@
     this.socketBufferSize = socketBufferSize;
 
     // Create the singleton ClientHealthMonitor
+    this.maximumTimeBetweenPings = maximumTimeBetweenPings;
     healthMonitor = clientHealthMonitorProvider.get(internalCache, maximumTimeBetweenPings,
         clientNotifier.getStats());
 
@@ -1865,6 +1867,11 @@
     releaseCommBuffer(Message.setTLCommBuffer(null));
   }
 
+  @Override
+  public int getMaximumTimeBetweenPings() {
+    return maximumTimeBetweenPings;
+  }
+
   private static class ClientQueueInitializerTask implements Runnable {
     private final Socket socket;
     private final boolean isPrimaryServerToClient;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 29dd180..45e6cc4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -15,6 +15,7 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
+
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -69,6 +70,12 @@
       new ConcurrentHashMap<>();
 
   /**
+   * The map of known clients and maximum time between pings.
+   */
+  private ConcurrentMap<ClientProxyMembershipID, Integer> clientMaximumTimeBetweenPings =
+      new ConcurrentHashMap<>();
+
+  /**
    * THe GemFire <code>Cache</code>
    */
   private final InternalCache cache;
@@ -77,7 +84,7 @@
     return maximumTimeBetweenPings;
   }
 
-  private final int maximumTimeBetweenPings;
+  private volatile int maximumTimeBetweenPings;
 
   /**
    * A thread that validates client connections
@@ -191,12 +198,19 @@
     refCount = 0;
   }
 
+  public void registerClient(ClientProxyMembershipID proxyID) {
+    registerClient(proxyID, maximumTimeBetweenPings);
+  }
+
   /**
    * Registers a new client to be monitored.
    *
    * @param proxyID The id of the client to be registered
    */
-  public void registerClient(ClientProxyMembershipID proxyID) {
+  public void registerClient(ClientProxyMembershipID proxyID, int maximumTimeBetweenPings) {
+    if (!clientMaximumTimeBetweenPings.containsKey(proxyID)) {
+      clientMaximumTimeBetweenPings.putIfAbsent(proxyID, maximumTimeBetweenPings);
+    }
     if (!clientHeartbeats.containsKey(proxyID)) {
       if (null == clientHeartbeats.putIfAbsent(proxyID,
           new AtomicLong(System.currentTimeMillis()))) {
@@ -233,6 +247,7 @@
       }
       expireTXStates(proxyID);
     }
+    clientMaximumTimeBetweenPings.remove(proxyID);
   }
 
   /**
@@ -349,7 +364,7 @@
 
     AtomicLong heartbeat = clientHeartbeats.get(proxyID);
     if (null == heartbeat) {
-      registerClient(proxyID);
+      registerClient(proxyID, getMaximumTimeBetweenPings(proxyID));
     } else {
       heartbeat.set(System.currentTimeMillis());
     }
@@ -581,6 +596,7 @@
    *
    * @param cache The GemFire <code>Cache</code>
    * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
+   *        client has died and interrupting its sockets
    */
   protected static synchronized void createInstance(InternalCache cache,
       int maximumTimeBetweenPings, CacheClientNotifierStats stats) {
@@ -597,6 +613,7 @@
    *
    * @param cache The GemFire <code>Cache</code>
    * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
+   *        client has died and interrupting its sockets
    */
   private ClientHealthMonitor(InternalCache cache, int maximumTimeBetweenPings,
       CacheClientNotifierStats stats) {
@@ -660,6 +677,10 @@
     return getNumberOfClientsAtOrAboveVersion(KnownVersion.GFE_61) > 0;
   }
 
+  private int getMaximumTimeBetweenPings(ClientProxyMembershipID proxyID) {
+    return clientMaximumTimeBetweenPings.getOrDefault(proxyID, maximumTimeBetweenPings);
+  }
+
   /**
    * Interface for changing the heartbeat timeout behavior in the ClientHealthMonitorThread, should
    * only be used for testing
@@ -786,8 +807,9 @@
                     (currentTime - latestHeartbeat), proxyID);
               }
 
+              int maximumTimeBetweenPingsForClient = getMaximumTimeBetweenPings(proxyID);
               if (checkHeartbeat.timedOut(currentTime, latestHeartbeat,
-                  _maximumTimeBetweenPings)) {
+                  maximumTimeBetweenPingsForClient)) {
                 // This client has been idle for too long. Determine whether
                 // any of its ServerConnection threads are currently processing
                 // a message. If so, let it go. If not, disconnect it.
@@ -795,7 +817,8 @@
                   if (cleanupClientThreads(proxyID, true)) {
                     logger.warn(
                         "Monitoring client with member id {}. It had been {} ms since the latest heartbeat. Max interval is {}. Terminated client.",
-                        entry.getKey(), currentTime - latestHeartbeat, _maximumTimeBetweenPings);
+                        entry.getKey(), currentTime - latestHeartbeat,
+                        maximumTimeBetweenPingsForClient);
                   }
                 } else {
                   if (logger.isDebugEnabled()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 471bba4..4f1553f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -634,7 +634,7 @@
         chmRegistered = true;
       }
       if (registerClient) {
-        chm.registerClient(proxyId);
+        chm.registerClient(proxyId, acceptor.getMaximumTimeBetweenPings());
       }
       serverConnectionCollection = chm.addConnection(proxyId, this);
       acceptor.getConnectionListener().connectionOpened(registerClient, communicationMode);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java
index c2a35ea..043b985 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitorTest.java
@@ -63,6 +63,7 @@
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
     ServerConnection mockConnection = mock(ServerConnection.class);
 
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.addConnection(mockId, mockConnection);
     clientHealthMonitor.receivedPing(mockId);
     clientHealthMonitor.testUseCustomHeartbeatCheck((a, b, c) -> true); // Fail all heartbeats
@@ -86,6 +87,7 @@
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
     ServerConnection mockConnection = mock(ServerConnection.class);
 
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.addConnection(mockId, mockConnection);
     clientHealthMonitor.receivedPing(mockId);
 
@@ -149,6 +151,7 @@
   @Test
   public void receivedPingNewClientRegistersWithCurrentTimeAndIncrementsStat() {
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.receivedPing(mockId);
     assertThat(clientHealthMonitor.getClientHeartbeats().get(mockId)).isNotNull()
         .isLessThanOrEqualTo(System.currentTimeMillis());
@@ -158,6 +161,7 @@
   @Test
   public void receivedPingExistingClientUpdatesTimeOnly() throws InterruptedException {
     ClientProxyMembershipID mockId = mock(ClientProxyMembershipID.class);
+    clientHealthMonitor.registerClient(mockId);
     clientHealthMonitor.receivedPing(mockId);
     Long expectedTime = clientHealthMonitor.getClientHeartbeats().get(mockId);
 
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 8446885..620b50c 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -142,6 +142,7 @@
 import org.apache.geode.internal.cache.InternalCacheBuilder;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PoolStats;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.execute.data.CustId;
 import org.apache.geode.internal.cache.execute.data.Customer;
@@ -1254,6 +1255,16 @@
     return stats;
   }
 
+  public static int getGatewaySenderPoolDisconnects(String senderId) {
+    AbstractGatewaySender sender =
+        (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);
+    assertNotNull(sender);
+
+    PoolStats poolStats = sender.getProxy().getStats();
+
+    return poolStats.getDisConnects();
+  }
+
   protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) {
     int size = 0;
     if (prQ != null) {
@@ -2053,18 +2064,30 @@
     }
   }
 
-  public static void createReceiverInVMs(VM... vms) {
+  public static void createReceiverInVMs(int maximumTimeBetweenPings, VM... vms) {
     for (VM vm : vms) {
-      vm.invoke(() -> createReceiver());
+      vm.invoke(() -> createReceiverWithMaximumTimeBetweenPings(maximumTimeBetweenPings));
     }
   }
 
+
+  public static void createReceiverInVMs(VM... vms) {
+    createReceiverInVMs(-1, vms);
+  }
+
   public static int createReceiver() {
+    return createReceiverWithMaximumTimeBetweenPings(-1);
+  }
+
+  public static int createReceiverWithMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
     int port = AvailablePortHelper.getRandomAvailableTCPPort();
     fact.setStartPort(port);
     fact.setEndPort(port);
     fact.setManualStart(true);
+    if (maximumTimeBetweenPings > 0) {
+      fact.setMaximumTimeBetweenPings(maximumTimeBetweenPings);
+    }
     GatewayReceiver receiver = fact.create();
     try {
       receiver.start();
@@ -2149,6 +2172,10 @@
   }
 
   public static int createServer(int locPort) {
+    return createServer(locPort, -1);
+  }
+
+  public static int createServer(int locPort, int maximumTimeBetweenPings) {
     WANTestBase test = new WANTestBase();
     Properties props = test.getDistributedSystemProperties();
     props.setProperty(MCAST_PORT, "0");
@@ -2160,6 +2187,9 @@
     int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
     server.setPort(port);
     server.setHostnameForClients("localhost");
+    if (maximumTimeBetweenPings > 0) {
+      server.setMaximumTimeBetweenPings(maximumTimeBetweenPings);
+    }
     try {
       server.start();
     } catch (IOException e) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 7ff1c8d..d41dc6a 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -17,6 +17,7 @@
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
@@ -28,6 +29,10 @@
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.internal.cache.execute.data.CustId;
 import org.apache.geode.internal.cache.execute.data.Customer;
 import org.apache.geode.internal.cache.execute.data.Order;
@@ -1095,7 +1100,54 @@
 
   }
 
+  @Test
+  public void testMaximumTimeBetweenPingsInGatewayReceiverIsHonored() {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
+    vm2.invoke(() -> createServer(nyPort));
+
+    // Set a maximum time between pings lower than the time between pings sent at the sender (5000)
+    // so that the receiver will not receive the ping on time and will close the connection.
+    int maximumTimeBetweenPingsInGatewayReceiver = 4000;
+    createReceiverInVMs(maximumTimeBetweenPingsInGatewayReceiver, vm2);
+    createReceiverPR(vm2, 0);
+
+    int maximumTimeBetweenPingsInServer = 60000;
+    vm4.invoke(() -> createServer(lnPort, maximumTimeBetweenPingsInServer));
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
+    createSenderPRInVM(0, vm4);
+    String senderId = "ln";
+    startSenderInVMs(senderId, vm4);
+
+    // Send some puts to start the connections from the sender to the receiver
+    vm4.invoke(() -> WANTestBase.doPuts(testName, 2));
+
+    // Create client to check if connections are later closed.
+    ClientCache clientCache = new ClientCacheFactory()
+        .addPoolLocator("localhost", lnPort)
+        .setPoolLoadConditioningInterval(-1)
+        .setPoolIdleTimeout(-1)
+        .setPoolPingInterval(5000)
+        .create();
+    Region clientRegion =
+        clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
+            .create(testName);
+    for (long i = 0; i < 2; i++) {
+      clientRegion.put(i, "Value_" + i);
+    }
+
+    // Wait more than maximum-time-between-pings in the gateway receiver so that connections are
+    // closed.
+    try {
+      Thread.sleep(maximumTimeBetweenPingsInGatewayReceiver + 2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    assertNotEquals(0, (int) vm4.invoke(() -> getGatewaySenderPoolDisconnects(senderId)));
+    assertEquals(0, ((PoolImpl) clientCache.getDefaultPool()).getStats().getDisConnects());
+  }
 
   protected Map putKeyValues() {
     final Map keyValues = new HashMap();
