GEODE-8521: detect if a p2p reader thread is hung (#5763)

* AbstractExecutor can now be suspended and resumed.
This does nothing on the base class but on P2PReaderExecutorGroup
it turns monitoring on and off.
Also the monitor thread will now detect a zero startTime
and in that case set the startTime allowing threads being
monitored to not keep setting it themselves (which happens more often).

* the timeInterval and timeLimit that come from gemfire properties
will now always be the values configured on the DistributedSystem.
Previously the thread monitoring classes had their own instance of
DistributionConfigImpl which at least in some cases could be inconsistent
the the config on the DistributedSystem. Now these two config values
are passed in to the constructor.

* If an instance of ResourceManagerStats can not be found then
it will now only prevent the monitor from updating the "numThreadsStuck"
stat. Before it prevented it from doing any detection and logging.
The lazy initialization of the ResourceManagerStats is now cleaner and does
not require the integration test to explicitly call run().

Co-authored-by: Darrel Schneider <darrel@vmware.com>
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/monitoring/ThreadsMonitoringIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/monitoring/ThreadsMonitoringIntegrationTest.java
index eeb219d..20f808c 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/monitoring/ThreadsMonitoringIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/monitoring/ThreadsMonitoringIntegrationTest.java
@@ -24,7 +24,6 @@
 
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
@@ -56,70 +55,126 @@
     nonDefault = new Properties();
     nonDefault.put(ConfigurationProperties.MCAST_PORT, "0");
     nonDefault.put(ConfigurationProperties.LOCATORS, "");
+    nonDefault.put(ConfigurationProperties.THREAD_MONITOR_ENABLED, "true");
+    nonDefault.put(ConfigurationProperties.THREAD_MONITOR_TIME_LIMIT, "30000");
 
     cache = (InternalCache) new CacheFactory(nonDefault).create();
   }
 
-  /**
-   * Tests that in case no instance of internal distribution system exists dummy instance is used
-   */
   @Test
   public void testThreadsMonitoringWorkflow() {
 
-    ThreadsMonitoring threadMonitoring = null;
+    DistributionManager distributionManager = cache.getDistributionManager();
+    assertThat(distributionManager).isNotNull();
+    ThreadsMonitoring threadMonitoring = distributionManager.getThreadMonitoring();
+    assertThat(threadMonitoring).isNotNull();
+
+    assertThat(threadMonitoring).isInstanceOf(ThreadsMonitoringImpl.class);
+    ThreadsMonitoringImpl impl = ((ThreadsMonitoringImpl) threadMonitoring);
+
+    impl.getTimer().cancel();
+
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should be false.")
+        .isFalse();
+
+    threadMonitoring.startMonitor(ThreadsMonitoring.Mode.FunctionExecutor);
+
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should still be false.")
+        .isFalse();
+
+    AbstractExecutor abstractExecutorGroup =
+        impl.getMonitorMap().get(Thread.currentThread().getId());
+    abstractExecutorGroup.setStartTime(abstractExecutorGroup.getStartTime()
+        - cache.getInternalDistributedSystem().getConfig().getThreadMonitorTimeLimit() - 1);
+
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should now be true.")
+        .isTrue();
+    assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
+        .describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
+        .isEqualTo(1);
+
+    impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 1, abstractExecutorGroup);
+    impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 2, abstractExecutorGroup);
+    impl.getThreadsMonitoringProcess().mapValidation();
+
+    assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
+        .describedAs("ThreadMonitor monitoring process should identify three stuck threads.")
+        .isEqualTo(3);
+
+    threadMonitoring.endMonitor();
+    impl.getThreadsMonitoringProcess().mapValidation();
+
+    assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
+        .describedAs("ThreadMonitor monitoring process should identify two stuck threads.")
+        .isEqualTo(2);
+  }
+
+  @Test
+  public void verifySuspendResumeFunctionCorrectly() {
 
     DistributionManager distributionManager = cache.getDistributionManager();
-    if (distributionManager != null) {
-      threadMonitoring = distributionManager.getThreadMonitoring();
-    }
+    assertThat(distributionManager).isNotNull();
+    ThreadsMonitoring threadMonitoring = distributionManager.getThreadMonitoring();
+    assertThat(threadMonitoring).isNotNull();
+    final int monitorTimeLimit =
+        cache.getInternalDistributedSystem().getConfig().getThreadMonitorTimeLimit();
 
+    assertThat(threadMonitoring).isInstanceOf(ThreadsMonitoringImpl.class);
+    ThreadsMonitoringImpl impl = ((ThreadsMonitoringImpl) threadMonitoring);
 
-    DistributionConfigImpl distributionConfigImpl = new DistributionConfigImpl(nonDefault);
-    if (distributionConfigImpl.getThreadMonitorEnabled() && threadMonitoring != null) {
-      assertThat(threadMonitoring).isInstanceOf(ThreadsMonitoringImpl.class);
-      ThreadsMonitoringImpl impl = ((ThreadsMonitoringImpl) threadMonitoring);
+    impl.getTimer().cancel();
 
-      impl.getTimer().cancel();
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should be false.")
+        .isFalse();
 
-      // to initiate ResourceManagerStats
-      impl.getThreadsMonitoringProcess().run();
+    AbstractExecutor executor =
+        threadMonitoring.createAbstractExecutor(ThreadsMonitoring.Mode.P2PReaderExecutor);
 
-      assertThat(impl.getThreadsMonitoringProcess().mapValidation())
-          .describedAs("ThreadMonitor monitoring process map validation should be false.")
-          .isFalse();
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should still be false.")
+        .isFalse();
 
-      threadMonitoring.startMonitor(ThreadsMonitoring.Mode.FunctionExecutor);
+    threadMonitoring.register(executor);
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should still be false.")
+        .isFalse();
+    assertThat(executor.getStartTime()).isNotZero();
 
-      assertThat(impl.getThreadsMonitoringProcess().mapValidation())
-          .describedAs("ThreadMonitor monitoring process map validation should still be false.")
-          .isFalse();
+    executor.setStartTime(executor.getStartTime() - monitorTimeLimit - 1);
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should now be true.")
+        .isTrue();
+    assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
+        .describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
+        .isEqualTo(1);
 
-      AbstractExecutor abstractExecutorGroup =
-          impl.getMonitorMap().get(Thread.currentThread().getId());
-      abstractExecutorGroup.setStartTime(abstractExecutorGroup.getStartTime()
-          - distributionConfigImpl.getThreadMonitorTimeLimit() - 1);
+    executor.suspendMonitoring();
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should still be false.")
+        .isFalse();
 
-      assertThat(impl.getThreadsMonitoringProcess().mapValidation())
-          .describedAs("ThreadMonitor monitoring process map validation should now be true.")
-          .isTrue();
-      assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
-          .describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
-          .isEqualTo(1);
+    executor.resumeMonitoring();
+    assertThat(executor.getStartTime()).isZero();
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should still be false.")
+        .isFalse();
+    assertThat(executor.getStartTime()).isNotZero();
 
-      impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 1, abstractExecutorGroup);
-      impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 2, abstractExecutorGroup);
-      impl.getThreadsMonitoringProcess().mapValidation();
+    executor.setStartTime(executor.getStartTime() - monitorTimeLimit - 1);
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should now be true.")
+        .isTrue();
+    assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
+        .describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
+        .isEqualTo(1);
 
-      assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
-          .describedAs("ThreadMonitor monitoring process should identify three stuck threads.")
-          .isEqualTo(3);
-
-      threadMonitoring.endMonitor();
-      impl.getThreadsMonitoringProcess().mapValidation();
-
-      assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
-          .describedAs("ThreadMonitor monitoring process should identify two stuck threads.")
-          .isEqualTo(2);
-    }
+    impl.unregister(executor);
+    assertThat(impl.getThreadsMonitoringProcess().mapValidation())
+        .describedAs("ThreadMonitor monitoring process map validation should still be false.")
+        .isFalse();
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
index 375ab50..dfafc9b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
@@ -179,7 +179,8 @@
 
     DistributionConfig config = system.getConfig();
 
-    threadMonitor = config.getThreadMonitorEnabled() ? new ThreadsMonitoringImpl(system)
+    threadMonitor = config.getThreadMonitorEnabled() ? new ThreadsMonitoringImpl(system,
+        config.getThreadMonitorInterval(), config.getThreadMonitorTimeLimit())
         : new ThreadsMonitoringImplDummy();
 
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 29889e7..532ca01 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -93,7 +93,8 @@
     DistributionConfig config = system.getConfig();
 
     if (config.getThreadMonitorEnabled()) {
-      this.threadMonitor = new ThreadsMonitoringImpl(system);
+      this.threadMonitor = new ThreadsMonitoringImpl(system, config.getThreadMonitorInterval(),
+          config.getThreadMonitorTimeLimit());
       logger.info("[ThreadsMonitor] New Monitor object and process were created.\n");
     } else {
       this.threadMonitor = new ThreadsMonitoringImplDummy();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
index 11c4ed9..aa5ba6d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
@@ -27,7 +27,8 @@
     SerialQueuedExecutor,
     OneTaskOnlyExecutor,
     ScheduledThreadExecutor,
-    AGSExecutor
+    AGSExecutor,
+    P2PReaderExecutor
   };
 
   Map<Long, AbstractExecutor> getMonitorMap();
@@ -38,19 +39,46 @@
   void close();
 
   /**
-   * Starting to monitor a new executor object.
+   * Start monitoring the calling thread.
    *
-   * @param mode the object executor group.
+   * @param mode describes the group the calling thread should be associated with.
    * @return true - if succeeded , false - if failed.
    */
   public boolean startMonitor(Mode mode);
 
   /**
-   * Ending the monitoring of an executor object.
+   * Stops monitoring the calling thread if it is currently being monitored.
    */
   public void endMonitor();
 
   /**
+   * Creates a new executor that is associated with the calling thread.
+   * Callers need to pass the returned executor to {@link #register(AbstractExecutor)}
+   * for this executor to be monitored.
+   *
+   * @param mode describes the group the calling thread should be associated with.
+   * @return the created {@link AbstractExecutor} instance.
+   */
+  public AbstractExecutor createAbstractExecutor(Mode mode);
+
+  /**
+   * Call to cause this thread monitor to start monitoring
+   * the given executor.
+   *
+   * @param executor the executor to monitor.
+   * @return true - if succeeded , false - if failed.
+   */
+  public boolean register(AbstractExecutor executor);
+
+  /**
+   * Call to cause this thread monitor to stop monitoring
+   * the given executor.
+   *
+   * @param executor the executor to stop monitoring.
+   */
+  public void unregister(AbstractExecutor executor);
+
+  /**
    * A long-running thread that may appear stuck should periodically update its "alive"
    * status by invoking this method
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
index 94f943c..1c0e49c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
@@ -16,17 +16,17 @@
 package org.apache.geode.internal.monitoring;
 
 
-import java.util.Properties;
 import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 import org.apache.geode.internal.monitoring.executor.FunctionExecutionPooledExecutorGroup;
 import org.apache.geode.internal.monitoring.executor.GatewaySenderEventProcessorGroup;
 import org.apache.geode.internal.monitoring.executor.OneTaskOnlyExecutorGroup;
+import org.apache.geode.internal.monitoring.executor.P2PReaderExecutorGroup;
 import org.apache.geode.internal.monitoring.executor.PooledExecutorGroup;
 import org.apache.geode.internal.monitoring.executor.ScheduledThreadPoolExecutorWKAGroup;
 import org.apache.geode.internal.monitoring.executor.SerialQueuedExecutorGroup;
@@ -38,20 +38,28 @@
   /** Monitors the health of the entire distributed system */
   private ThreadsMonitoringProcess tmProcess = null;
 
-  private final Properties nonDefault = new Properties();
-  private final DistributionConfigImpl distributionConfigImpl =
-      new DistributionConfigImpl(nonDefault);
-
-  private final Timer timer =
-      new Timer("ThreadsMonitor", true);
+  private final Timer timer;
 
   /** Is this ThreadsMonitoringImpl closed?? */
   private boolean isClosed = true;
 
-  public ThreadsMonitoringImpl(InternalDistributedSystem iDistributedSystem) {
+  public ThreadsMonitoringImpl(InternalDistributedSystem iDistributedSystem, int timeIntervalMillis,
+      int timeLimitMillis) {
+    this(iDistributedSystem, timeIntervalMillis, timeLimitMillis, true);
+  }
+
+  @VisibleForTesting
+  ThreadsMonitoringImpl(InternalDistributedSystem iDistributedSystem, int timeIntervalMillis,
+      int timeLimitMillis, boolean startThread) {
     this.monitorMap = new ConcurrentHashMap<>();
     this.isClosed = false;
-    setThreadsMonitoringProcess(iDistributedSystem);
+    if (startThread) {
+      timer = new Timer("ThreadsMonitor", true);
+      tmProcess = new ThreadsMonitoringProcess(this, iDistributedSystem, timeLimitMillis);
+      timer.schedule(tmProcess, 0, timeIntervalMillis);
+    } else {
+      timer = null;
+    }
   }
 
   @Override
@@ -69,19 +77,13 @@
       return;
 
     isClosed = true;
-    if (tmProcess != null) {
+    if (timer != null) {
       this.timer.cancel();
       this.tmProcess = null;
     }
     this.monitorMap.clear();
   }
 
-  /** Starts a new {@link org.apache.geode.internal.monitoring.ThreadsMonitoringProcess} */
-  private void setThreadsMonitoringProcess(InternalDistributedSystem iDistributedSystem) {
-    this.tmProcess = new ThreadsMonitoringProcess(this, iDistributedSystem);
-    this.timer.schedule(tmProcess, 0, distributionConfigImpl.getThreadMonitorInterval());
-  }
-
   public ThreadsMonitoringProcess getThreadsMonitoringProcess() {
     return this.tmProcess;
   }
@@ -96,39 +98,68 @@
 
   @Override
   public boolean startMonitor(Mode mode) {
-    AbstractExecutor absExtgroup;
-    switch (mode) {
-      case FunctionExecutor:
-        absExtgroup = new FunctionExecutionPooledExecutorGroup(this);
-        break;
-      case PooledExecutor:
-        absExtgroup = new PooledExecutorGroup(this);
-        break;
-      case SerialQueuedExecutor:
-        absExtgroup = new SerialQueuedExecutorGroup(this);
-        break;
-      case OneTaskOnlyExecutor:
-        absExtgroup = new OneTaskOnlyExecutorGroup(this);
-        break;
-      case ScheduledThreadExecutor:
-        absExtgroup = new ScheduledThreadPoolExecutorWKAGroup(this);
-        break;
-      case AGSExecutor:
-        absExtgroup = new GatewaySenderEventProcessorGroup(this);
-        break;
-      default:
-        return false;
-    }
-    this.monitorMap.put(Thread.currentThread().getId(), absExtgroup);
-    return true;
+    AbstractExecutor executor = createAbstractExecutor(mode);
+    executor.setStartTime(System.currentTimeMillis());
+    return register(executor);
   }
 
   @Override
   public void endMonitor() {
-    this.monitorMap.remove(Thread.currentThread().getId());
+    monitorMap.remove(Thread.currentThread().getId());
   }
 
-  public Timer getTimer() {
+  @VisibleForTesting
+  boolean isMonitoring() {
+    AbstractExecutor executor = monitorMap.get(Thread.currentThread().getId());
+    if (executor == null) {
+      return false;
+    }
+    return !executor.isMonitoringSuspended();
+  }
+
+  @Override
+  public AbstractExecutor createAbstractExecutor(Mode mode) {
+    switch (mode) {
+      case FunctionExecutor:
+        return new FunctionExecutionPooledExecutorGroup();
+      case PooledExecutor:
+        return new PooledExecutorGroup();
+      case SerialQueuedExecutor:
+        return new SerialQueuedExecutorGroup();
+      case OneTaskOnlyExecutor:
+        return new OneTaskOnlyExecutorGroup();
+      case ScheduledThreadExecutor:
+        return new ScheduledThreadPoolExecutorWKAGroup();
+      case AGSExecutor:
+        return new GatewaySenderEventProcessorGroup();
+      case P2PReaderExecutor:
+        return new P2PReaderExecutorGroup();
+      default:
+        throw new IllegalStateException("Unhandled mode=" + mode);
+    }
+  }
+
+  @Override
+  public boolean register(AbstractExecutor executor) {
+    monitorMap.put(executor.getThreadID(), executor);
+    return true;
+  }
+
+  @Override
+  public void unregister(AbstractExecutor executor) {
+    monitorMap.remove(executor.getThreadID());
+  }
+
+  @VisibleForTesting
+  boolean isMonitoring(AbstractExecutor executor) {
+    if (executor.isMonitoringSuspended()) {
+      return false;
+    }
+    return monitorMap.containsKey(executor.getThreadID());
+  }
+
+  @VisibleForTesting
+  Timer getTimer() {
     return this.timer;
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplDummy.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplDummy.java
index a2acd93..e2c5f26 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplDummy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplDummy.java
@@ -33,6 +33,21 @@
   public void endMonitor() {}
 
   @Override
+  public AbstractExecutor createAbstractExecutor(Mode mode) {
+    return null;
+  }
+
+  @Override
+  public boolean register(AbstractExecutor executor) {
+    return true;
+  }
+
+  @Override
+  public void unregister(AbstractExecutor executor) {
+
+  }
+
+  @Override
   public void updateThreadStatus() {}
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
index 85a5d94..3e1ea78 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
@@ -15,14 +15,12 @@
 
 package org.apache.geode.internal.monitoring;
 
-import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.TimerTask;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
@@ -33,73 +31,86 @@
 
 public class ThreadsMonitoringProcess extends TimerTask {
 
-  private final ThreadsMonitoring threadsMonitoring;
-  private ResourceManagerStats resourceManagerStats = null;
   private static final Logger logger = LogService.getLogger();
-  private final int timeLimit;
+
+  private final ThreadsMonitoring threadsMonitoring;
+  private final int timeLimitMillis;
   private final InternalDistributedSystem internalDistributedSystem;
 
-  private final Properties nonDefault = new Properties();
-  private final DistributionConfigImpl distributionConfigImpl =
-      new DistributionConfigImpl(nonDefault);
+  private ResourceManagerStats resourceManagerStats = null;
 
   protected ThreadsMonitoringProcess(ThreadsMonitoring tMonitoring,
-      InternalDistributedSystem iDistributedSystem) {
-    this.timeLimit = this.distributionConfigImpl.getThreadMonitorTimeLimit();
+      InternalDistributedSystem iDistributedSystem, int timeLimitMillis) {
+    this.timeLimitMillis = timeLimitMillis;
     this.threadsMonitoring = tMonitoring;
     this.internalDistributedSystem = iDistributedSystem;
   }
 
+  @VisibleForTesting
+  /**
+   * Returns true if a stuck thread was detected
+   */
   public boolean mapValidation() {
-    boolean isStuck = false;
     int numOfStuck = 0;
-    for (Entry<Long, AbstractExecutor> entry1 : this.threadsMonitoring.getMonitorMap().entrySet()) {
-      logger.trace("Checking thread {}", entry1.getKey());
-      long currentTime = System.currentTimeMillis();
-      long delta = currentTime - entry1.getValue().getStartTime();
-      if (delta >= this.timeLimit) {
-        isStuck = true;
+    for (AbstractExecutor executor : threadsMonitoring.getMonitorMap().values()) {
+      if (executor.isMonitoringSuspended()) {
+        continue;
+      }
+      final long startTime = executor.getStartTime();
+      final long currentTime = System.currentTimeMillis();
+      if (startTime == 0) {
+        executor.setStartTime(currentTime);
+        continue;
+      }
+      long threadId = executor.getThreadID();
+      logger.trace("Checking thread {}", threadId);
+      long delta = currentTime - startTime;
+      if (delta >= timeLimitMillis) {
         numOfStuck++;
-        logger.warn("Thread {} (0x{}) is stuck", entry1.getKey(),
-            Long.toHexString(entry1.getKey()));
-        entry1.getValue().handleExpiry(delta);
+        logger.warn("Thread {} (0x{}) is stuck", threadId, Long.toHexString(threadId));
+        executor.handleExpiry(delta);
       }
     }
-    if (!isStuck) {
-      if (this.resourceManagerStats != null)
-        this.resourceManagerStats.setNumThreadStuck(0);
+    updateNumThreadStuckStatistic(numOfStuck);
+    if (numOfStuck == 0) {
       logger.trace("There are no stuck threads in the system");
-      return false;
+    } else if (numOfStuck != 1) {
+      logger.warn("There are {} stuck threads in this node", numOfStuck);
     } else {
-      if (this.resourceManagerStats != null)
-        this.resourceManagerStats.setNumThreadStuck(numOfStuck);
-      if (numOfStuck != 1) {
-        logger.warn("There are {} stuck threads in this node", numOfStuck);
-      } else {
-        logger.warn("There is 1 stuck thread in this node");
-      }
-      return true;
+      logger.warn("There is 1 stuck thread in this node");
+    }
+    return numOfStuck != 0;
+  }
+
+  private void updateNumThreadStuckStatistic(int numOfStuck) {
+    ResourceManagerStats stats = getResourceManagerStats();
+    if (stats != null) {
+      stats.setNumThreadStuck(numOfStuck);
     }
   }
 
   @Override
   public void run() {
-    if (this.resourceManagerStats == null) {
-      try {
-        if (this.internalDistributedSystem == null || !this.internalDistributedSystem.isConnected())
-          return;
-        DistributionManager distributionManager =
-            this.internalDistributedSystem.getDistributionManager();
-        InternalCache cache = distributionManager.getExistingCache();
-        this.resourceManagerStats = cache.getInternalResourceManager().getStats();
-      } catch (CacheClosedException e1) {
-        logger.trace("No cache exists yet - process will run on next iteration");
-      }
-    } else
-      mapValidation();
+    mapValidation();
   }
 
+  @VisibleForTesting
   public ResourceManagerStats getResourceManagerStats() {
-    return this.resourceManagerStats;
+    ResourceManagerStats result = resourceManagerStats;
+    if (result == null) {
+      try {
+        if (internalDistributedSystem == null || !internalDistributedSystem.isConnected()) {
+          return null;
+        }
+        DistributionManager distributionManager =
+            internalDistributedSystem.getDistributionManager();
+        InternalCache cache = distributionManager.getExistingCache();
+        result = cache.getInternalResourceManager().getStats();
+        resourceManagerStats = result;
+      } catch (CacheClosedException e1) {
+        logger.trace("could not update statistic since cache is closed");
+      }
+    }
+    return result;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
index 4beba18..a89f6dc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
@@ -21,7 +21,7 @@
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 public abstract class AbstractExecutor {
@@ -29,19 +29,19 @@
   private static final int THREAD_DUMP_DEPTH = 40;
   private static final Logger logger = LogService.getLogger();
   public static final String LOCK_OWNER_THREAD_STACK = "Lock owner thread stack";
-  private long threadID;
-  private String groupName;
+  private final long threadID;
+  private final String groupName;
   private short numIterationsStuck;
-  private long startTime;
+  private volatile long startTime;
 
-  public AbstractExecutor(ThreadsMonitoring tMonitoring) {
-    this.startTime = System.currentTimeMillis();
-    this.numIterationsStuck = 0;
-    this.threadID = Thread.currentThread().getId();
+  public AbstractExecutor(String groupName) {
+    this(groupName, Thread.currentThread().getId());
   }
 
-  public AbstractExecutor(ThreadsMonitoring tMonitoring, long threadID) {
-    this.startTime = System.currentTimeMillis();
+  @VisibleForTesting
+  AbstractExecutor(String groupName, long threadID) {
+    this.groupName = groupName;
+    this.startTime = 0;
     this.numIterationsStuck = 0;
     this.threadID = threadID;
   }
@@ -132,12 +132,16 @@
     return this.groupName;
   }
 
-  public void setGroupName(String groupName) {
-    this.groupName = groupName;
-  }
-
   public long getThreadID() {
     return this.threadID;
   }
 
+  public void suspendMonitoring() {}
+
+  public void resumeMonitoring() {}
+
+  public boolean isMonitoringSuspended() {
+    return false;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroup.java
index 181c1c4..c2c7bbd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroup.java
@@ -14,16 +14,13 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-import org.apache.geode.internal.monitoring.ThreadsMonitoring;
-
 public class FunctionExecutionPooledExecutorGroup extends AbstractExecutor {
 
   public static final String GROUPNAME = "FunctionExecutionPooledExecutor";
 
 
-  public FunctionExecutionPooledExecutorGroup(ThreadsMonitoring tMonitoring) {
-    super(tMonitoring);
-    setGroupName(GROUPNAME);
+  public FunctionExecutionPooledExecutorGroup() {
+    super(GROUPNAME);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroup.java
index 99b64ad..e7dee9e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroup.java
@@ -14,15 +14,12 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-import org.apache.geode.internal.monitoring.ThreadsMonitoring;
-
 public class GatewaySenderEventProcessorGroup extends AbstractExecutor {
 
   public static final String GROUPNAME = "GatewaySenderEventProcessor";
 
-  public GatewaySenderEventProcessorGroup(ThreadsMonitoring tMonitoring) {
-    super(tMonitoring);
-    setGroupName(GROUPNAME);
+  public GatewaySenderEventProcessorGroup() {
+    super(GROUPNAME);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroup.java
index 11d2c38..80ecdb0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroup.java
@@ -14,15 +14,12 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-import org.apache.geode.internal.monitoring.ThreadsMonitoring;
-
 public class OneTaskOnlyExecutorGroup extends AbstractExecutor {
 
   public static final String GROUPNAME = "OneTaskOnlyExecutor";
 
-  public OneTaskOnlyExecutorGroup(ThreadsMonitoring tMonitoring) {
-    super(tMonitoring);
-    setGroupName(GROUPNAME);
+  public OneTaskOnlyExecutorGroup() {
+    super(GROUPNAME);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
new file mode 100644
index 0000000..6ee9909
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.monitoring.executor;
+
+public class P2PReaderExecutorGroup extends AbstractExecutor {
+
+  public static final String GROUP_NAME = "P2PReaderExecutor";
+
+  private volatile boolean suspended;
+
+  public P2PReaderExecutorGroup() {
+    super(GROUP_NAME);
+  }
+
+  @Override
+  public void suspendMonitoring() {
+    suspended = true;
+  }
+
+  @Override
+  public void resumeMonitoring() {
+    setStartTime(0);
+    suspended = false;
+  }
+
+  @Override
+  public boolean isMonitoringSuspended() {
+    return suspended;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroup.java
index 4ecacef..9ecc275 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroup.java
@@ -14,19 +14,18 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.annotations.VisibleForTesting;
 
 public class PooledExecutorGroup extends AbstractExecutor {
 
   public static final String GROUPNAME = "PooledExecutorWithDMStats";
 
-  public PooledExecutorGroup(ThreadsMonitoring tMonitoring) {
-    super(tMonitoring);
-    setGroupName(GROUPNAME);
+  public PooledExecutorGroup() {
+    super(GROUPNAME);
   }
 
-  public PooledExecutorGroup(ThreadsMonitoring tMonitoring, long threadID) {
-    super(tMonitoring, threadID);
-    setGroupName(GROUPNAME);
+  @VisibleForTesting
+  public PooledExecutorGroup(long threadID) {
+    super(GROUPNAME, threadID);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroup.java
index b2dfcd9..515af11 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroup.java
@@ -14,16 +14,13 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-import org.apache.geode.internal.monitoring.ThreadsMonitoring;
-
 public class ScheduledThreadPoolExecutorWKAGroup extends AbstractExecutor {
 
   public static final String GROUPNAME = "ScheduledThreadPoolExecutorWithKeepAlive";
 
 
-  public ScheduledThreadPoolExecutorWKAGroup(ThreadsMonitoring tMonitoring) {
-    super(tMonitoring);
-    setGroupName(GROUPNAME);
+  public ScheduledThreadPoolExecutorWKAGroup() {
+    super(GROUPNAME);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroup.java
index e579345..8211712 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroup.java
@@ -14,15 +14,12 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-import org.apache.geode.internal.monitoring.ThreadsMonitoring;
-
 public class SerialQueuedExecutorGroup extends AbstractExecutor {
 
   public static final String GROUPNAME = "SerialQueuedExecutorWithDMStats";
 
-  public SerialQueuedExecutorGroup(ThreadsMonitoring tMonitoring) {
-    super(tMonitoring);
-    setGroupName(GROUPNAME);
+  public SerialQueuedExecutorGroup() {
+    super(GROUPNAME);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 8fcaae1..6d4c675 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,6 +18,7 @@
 import static java.lang.ThreadLocal.withInitial;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
 import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode.P2PReaderExecutor;
 import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
 
 import java.io.DataInput;
@@ -77,6 +78,8 @@
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.ByteBufferSharing;
 import org.apache.geode.internal.net.NioFilter;
@@ -508,6 +511,10 @@
     }
   }
 
+  private ThreadsMonitoring getThreadMonitoring() {
+    return conduit.getDM().getThreadMonitoring();
+  }
+
   public boolean isSharedResource() {
     return sharedResource;
   }
@@ -1580,11 +1587,17 @@
         logger.debug("Starting {} on {}", p2pReaderName(), socket);
       }
     }
+
     // we should not change the state of the connection if we are a handshake reader thread
     // as there is a race between this thread and the application thread doing direct ack
     boolean handshakeHasBeenRead = false;
     // if we're using SSL/TLS the input buffer may already have data to process
     boolean skipInitialRead = getInputBuffer().position() > 0;
+    final ThreadsMonitoring threadMonitoring = getThreadMonitoring();
+    final AbstractExecutor threadMonitorExecutor =
+        threadMonitoring.createAbstractExecutor(P2PReaderExecutor);
+    threadMonitorExecutor.suspendMonitoring();
+    threadMonitoring.register(threadMonitorExecutor);
     try {
       for (boolean isInitialRead = true;;) {
         if (stopped) {
@@ -1635,7 +1648,7 @@
             }
             return;
           }
-          processInputBuffer();
+          processInputBuffer(threadMonitorExecutor);
 
           if (!handshakeHasBeenRead && !isReceiver && (handshakeRead || handshakeCancelled)) {
             if (logger.isDebugEnabled()) {
@@ -1710,6 +1723,7 @@
         }
       }
     } finally {
+      threadMonitoring.unregister(threadMonitorExecutor);
       hasResidualReaderThread = false;
       if (!handshakeHasBeenRead || (sharedResource && !asyncMode)) {
         synchronized (stateLock) {
@@ -2748,7 +2762,8 @@
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
    */
-  private void processInputBuffer() throws ConnectionException, IOException {
+  private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
+      throws ConnectionException, IOException {
     inputBuffer.flip();
 
     try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
@@ -2779,7 +2794,7 @@
 
             if (handshakeRead) {
               try {
-                readMessage(peerDataBuffer);
+                readMessage(peerDataBuffer, threadMonitorExecutor);
               } catch (SerializationException e) {
                 logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
                 throw e;
@@ -2949,7 +2964,7 @@
     return false;
   }
 
-  private void readMessage(ByteBuffer peerDataBuffer) {
+  private void readMessage(ByteBuffer peerDataBuffer, AbstractExecutor threadMonitorExecutor) {
     if (messageType == NORMAL_MSG_TYPE) {
       owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
       try (ByteBufferInputStream bbis =
@@ -2975,7 +2990,7 @@
               bbis.available());
         }
         try {
-          if (!dispatchMessage(msg, messageLength, directAck)) {
+          if (!dispatchMessage(msg, messageLength, directAck, threadMonitorExecutor)) {
             directAck = false;
           }
         } catch (MemberShunnedException e) {
@@ -3095,7 +3110,7 @@
       }
       if (msg != null) {
         try {
-          if (!dispatchMessage(msg, msgLength, directAck)) {
+          if (!dispatchMessage(msg, msgLength, directAck, threadMonitorExecutor)) {
             directAck = false;
           }
         } catch (MemberShunnedException e) {
@@ -3239,8 +3254,10 @@
     }
   }
 
-  private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck)
+  private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck,
+      AbstractExecutor threadMonitorExecutor)
       throws MemberShunnedException {
+    threadMonitorExecutor.resumeMonitoring();
     try {
       msg.setDoDecMessagesBeingReceived(true);
       if (directAck) {
@@ -3251,6 +3268,7 @@
       owner.getConduit().messageReceived(this, msg, bytesRead);
       return true;
     } finally {
+      threadMonitorExecutor.suspendMonitoring();
       if (msg.containsRegionContentChange()) {
         messagesReceived++;
       }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
index 648b803..7840fbb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
@@ -14,10 +14,12 @@
  */
 package org.apache.geode.internal.monitoring;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -27,6 +29,7 @@
 
 import org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode;
 import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.monitoring.executor.FunctionExecutionPooledExecutorGroup;
 
 /**
  * Contains simple tests for the {@link org.apache.geode.internal.monitoring.ThreadsMonitoringImpl}.
@@ -39,7 +42,7 @@
 
   @Before
   public void before() {
-    threadsMonitoringImpl = new ThreadsMonitoringImpl(null);
+    threadsMonitoringImpl = new ThreadsMonitoringImpl(null, 100000, 0, false);
   }
 
   @After
@@ -58,6 +61,59 @@
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.OneTaskOnlyExecutor));
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.ScheduledThreadExecutor));
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.AGSExecutor));
+    assertTrue(threadsMonitoringImpl.startMonitor(Mode.P2PReaderExecutor));
+  }
+
+  @Test
+  public void verifyMonitorLifeCycle() {
+    assertFalse(threadsMonitoringImpl.isMonitoring());
+    threadsMonitoringImpl.startMonitor(Mode.FunctionExecutor);
+    assertTrue(threadsMonitoringImpl.isMonitoring());
+    threadsMonitoringImpl.endMonitor();
+    assertFalse(threadsMonitoringImpl.isMonitoring());
+  }
+
+  @Test
+  public void verifyExecutorMonitoringLifeCycle() {
+    AbstractExecutor executor =
+        threadsMonitoringImpl.createAbstractExecutor(Mode.P2PReaderExecutor);
+    assertThat(threadsMonitoringImpl.isMonitoring(executor)).isFalse();
+    threadsMonitoringImpl.register(executor);
+    assertThat(threadsMonitoringImpl.isMonitoring(executor)).isTrue();
+    executor.suspendMonitoring();
+    assertThat(threadsMonitoringImpl.isMonitoring(executor)).isFalse();
+    executor.resumeMonitoring();
+    assertThat(threadsMonitoringImpl.isMonitoring(executor)).isTrue();
+    threadsMonitoringImpl.unregister(executor);
+    assertThat(threadsMonitoringImpl.isMonitoring(executor)).isFalse();
+    threadsMonitoringImpl.register(executor);
+    assertThat(threadsMonitoringImpl.isMonitoring(executor)).isTrue();
+    threadsMonitoringImpl.unregister(executor);
+    assertThat(threadsMonitoringImpl.isMonitoring(executor)).isFalse();
+  }
+
+  @Test
+  public void createAbstractExecutorIsAssociatedWithCallingThread() {
+    AbstractExecutor executor = threadsMonitoringImpl.createAbstractExecutor(Mode.FunctionExecutor);
+    assertThat(executor.getThreadID()).isEqualTo(Thread.currentThread().getId());
+  }
+
+  @Test
+  public void createAbstractExecutorDoesNotSetStartTime() {
+    AbstractExecutor executor = threadsMonitoringImpl.createAbstractExecutor(Mode.FunctionExecutor);
+    assertThat(executor.getStartTime()).isEqualTo(0);
+  }
+
+  @Test
+  public void createAbstractExecutorSetsNumIterationsStuckToZero() {
+    AbstractExecutor executor = threadsMonitoringImpl.createAbstractExecutor(Mode.FunctionExecutor);
+    assertThat(executor.getNumIterationsStuck()).isEqualTo((short) 0);
+  }
+
+  @Test
+  public void createAbstractExecutorSetsExpectedGroupName() {
+    AbstractExecutor executor = threadsMonitoringImpl.createAbstractExecutor(Mode.FunctionExecutor);
+    assertThat(executor.getGroupName()).isEqualTo(FunctionExecutionPooledExecutorGroup.GROUPNAME);
   }
 
   /**
@@ -65,24 +121,28 @@
    */
   @Test
   public void testClosure() {
-    assertTrue(threadsMonitoringImpl.getThreadsMonitoringProcess() != null);
-    assertFalse(threadsMonitoringImpl.isClosed());
-    threadsMonitoringImpl.close();
-    assertTrue(threadsMonitoringImpl.isClosed());
-    assertFalse(threadsMonitoringImpl.getThreadsMonitoringProcess() != null);
+    ThreadsMonitoringImpl liveMonitor = new ThreadsMonitoringImpl(null, 100000, 0, true);
+    assertTrue(liveMonitor.getThreadsMonitoringProcess() != null);
+    assertFalse(liveMonitor.isClosed());
+    liveMonitor.close();
+    assertTrue(liveMonitor.isClosed());
+    assertFalse(liveMonitor.getThreadsMonitoringProcess() != null);
   }
 
   @Test
-  public void updateThreadStatus() {
+  public void updateThreadStatusCallsSetStartTime() {
     AbstractExecutor executor = mock(AbstractExecutor.class);
     long threadId = Thread.currentThread().getId();
 
     threadsMonitoringImpl.getMonitorMap().put(threadId, executor);
     threadsMonitoringImpl.updateThreadStatus();
-
-    // also test the case where there is no AbstractExcecutor present
-    threadsMonitoringImpl.getMonitorMap().remove(threadId);
-    threadsMonitoringImpl.updateThreadStatus();
     verify(executor, times(1)).setStartTime(any(Long.class));
   }
+
+  @Test
+  public void updateThreadStatusWithoutExecutorInMapDoesNotCallSetStartTime() {
+    AbstractExecutor executor = mock(AbstractExecutor.class);
+    threadsMonitoringImpl.updateThreadStatus();
+    verify(executor, never()).setStartTime(any(Long.class));
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
index 6ac4c24..3827308 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
@@ -34,11 +34,12 @@
     SerialQueuedExecutor,
     OneTaskOnlyExecutor,
     ScheduledThreadExecutor,
-    AGSExecutor
+    AGSExecutor,
+    P2PReaderExecutor
   };
 
 
-  public final int numberOfElements = 6;
+  public final int numberOfElements = 7;
   private static final Logger logger = LogService.getLogger();
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcessJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcessJUnitTest.java
index a5e6563..b3c2117 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcessJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcessJUnitTest.java
@@ -17,12 +17,9 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.Properties;
-
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode;
 import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 import org.apache.geode.internal.monitoring.executor.PooledExecutorGroup;
@@ -34,11 +31,13 @@
  */
 public class ThreadsMonitoringProcessJUnitTest {
 
+  private static final int TIME_LIMIT_MILLIS = 1000;
+
   private ThreadsMonitoringImpl threadsMonitoringImpl;
 
   @Before
   public void before() {
-    threadsMonitoringImpl = new ThreadsMonitoringImpl(null);
+    threadsMonitoringImpl = new ThreadsMonitoringImpl(null, 100000, TIME_LIMIT_MILLIS);
   }
 
   /**
@@ -47,14 +46,10 @@
   @Test
   public void testThreadIsStuck() {
 
-    final Properties nonDefault = new Properties();
-    final DistributionConfigImpl distributionConfigImpl = new DistributionConfigImpl(nonDefault);
     final long threadID = 123456;
 
-    int timeLimit = distributionConfigImpl.getThreadMonitorTimeLimit();
-
-    AbstractExecutor absExtgroup = new PooledExecutorGroup(threadsMonitoringImpl);
-    absExtgroup.setStartTime(absExtgroup.getStartTime() - timeLimit - 1);
+    AbstractExecutor absExtgroup = new PooledExecutorGroup();
+    absExtgroup.setStartTime(absExtgroup.getStartTime() - TIME_LIMIT_MILLIS - 1);
 
     threadsMonitoringImpl.getMonitorMap().put(threadID, absExtgroup);
 
@@ -65,14 +60,10 @@
 
   @Test
   public void monitorHandlesDefunctThread() {
-    final Properties nonDefault = new Properties();
-    final DistributionConfigImpl distributionConfigImpl = new DistributionConfigImpl(nonDefault);
     final long threadID = Long.MAX_VALUE;
 
-    int timeLimit = distributionConfigImpl.getThreadMonitorTimeLimit();
-
-    AbstractExecutor absExtgroup = new PooledExecutorGroup(threadsMonitoringImpl, threadID);
-    absExtgroup.setStartTime(absExtgroup.getStartTime() - timeLimit - 1);
+    AbstractExecutor absExtgroup = new PooledExecutorGroup(threadID);
+    absExtgroup.setStartTime(absExtgroup.getStartTime() - TIME_LIMIT_MILLIS - 1);
 
     threadsMonitoringImpl.getMonitorMap().put(threadID, absExtgroup);
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java
index 4ed32b8..f5ee7f3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java
@@ -31,7 +31,7 @@
 public class AbstractExecutorGroupJUnitTest {
 
   private final AbstractExecutor abstractExecutorGroup =
-      new FunctionExecutionPooledExecutorGroup(null);
+      new FunctionExecutionPooledExecutorGroup();
 
   private static final long timeoutInMilliseconds = GeodeAwaitility.getTimeout().toMillis();
 
@@ -91,7 +91,7 @@
     blockedThread.start();
     await().until(() -> blockedThreadWaiting[0]);
     try {
-      AbstractExecutor executor = new AbstractExecutor(null, blockedThread.getId()) {
+      AbstractExecutor executor = new AbstractExecutor("testGroup", blockedThread.getId()) {
         @Override
         public void handleExpiry(long stuckTime) {
           // no-op
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroupJUnitTest.java
index 5a8feb1..60b401b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/FunctionExecutionPooledExecutorGroupJUnitTest.java
@@ -36,7 +36,7 @@
   @Test
   public void testVerifyGroupName() {
     AbstractExecutor functionExecutionPooledExecutorGroup =
-        new FunctionExecutionPooledExecutorGroup(null);
+        new FunctionExecutionPooledExecutorGroup();
     assertTrue(functionExecutionPooledExecutorGroup.getGroupName().equals(GROUPNAME));
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroupJUnitTest.java
index 1f5f0ff..bade204 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/GatewaySenderEventProcessorGroupJUnitTest.java
@@ -36,7 +36,7 @@
   @Test
   public void testVerifyGroupName() {
     AbstractExecutor gatewaySenderEventProcessorGroup =
-        new GatewaySenderEventProcessorGroup(null);
+        new GatewaySenderEventProcessorGroup();
     assertTrue(gatewaySenderEventProcessorGroup.getGroupName().equals(GROUPNAME));
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroupJUnitTest.java
index 8a30e15..99370a4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/OneTaskOnlyExecutorGroupJUnitTest.java
@@ -34,7 +34,7 @@
    */
   @Test
   public void testVerifyGroupName() {
-    AbstractExecutor oneTaskOnlyExecutorGroup = new OneTaskOnlyExecutorGroup(null);
+    AbstractExecutor oneTaskOnlyExecutorGroup = new OneTaskOnlyExecutorGroup();
     assertTrue(oneTaskOnlyExecutorGroup.getGroupName().equals(GROUPNAME));
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
new file mode 100644
index 0000000..c266c4c
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.monitoring.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class P2PReaderExecutorGroupTest {
+
+  @Test
+  public void testVerifyGroupName() {
+    assertThat(new P2PReaderExecutorGroup().getGroupName())
+        .isEqualTo(P2PReaderExecutorGroup.GROUP_NAME);
+  }
+
+  @Test
+  public void verifySuspendLifecycle() {
+    P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
+    assertThat(executor.isMonitoringSuspended()).isFalse();
+    executor.suspendMonitoring();
+    assertThat(executor.isMonitoringSuspended()).isTrue();
+    executor.suspendMonitoring();
+    assertThat(executor.isMonitoringSuspended()).isTrue();
+    executor.resumeMonitoring();
+    assertThat(executor.isMonitoringSuspended()).isFalse();
+    executor.resumeMonitoring();
+    assertThat(executor.isMonitoringSuspended()).isFalse();
+    executor.suspendMonitoring();
+    assertThat(executor.isMonitoringSuspended()).isTrue();
+  }
+
+  @Test
+  public void verifyResumeClearsStartTime() {
+    P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
+    executor.setStartTime(1);
+    assertThat(executor.getStartTime()).isEqualTo(1);
+    executor.resumeMonitoring();
+    assertThat(executor.getStartTime()).isEqualTo(0);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroupJUnitTest.java
index a93cde6..5083278 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/PooledExecutorGroupJUnitTest.java
@@ -34,7 +34,7 @@
    */
   @Test
   public void testVerifyGroupName() {
-    AbstractExecutor pooledExecutorGroup = new PooledExecutorGroup(null);
+    AbstractExecutor pooledExecutorGroup = new PooledExecutorGroup();
     assertTrue(pooledExecutorGroup.getGroupName().equals(GROUPNAME));
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroupJUnitTest.java
index d067a15..6b6040c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ScheduledThreadPoolExecutorWKAGroupJUnitTest.java
@@ -36,7 +36,7 @@
   @Test
   public void testVerifyGroupName() {
     AbstractExecutor scheduledThreadPoolExecutorWKAGroup =
-        new ScheduledThreadPoolExecutorWKAGroup(null);
+        new ScheduledThreadPoolExecutorWKAGroup();
     assertTrue(scheduledThreadPoolExecutorWKAGroup.getGroupName().equals(GROUPNAME));
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroupJUnitTest.java
index 47ccca6..e0bcba8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SerialQueuedExecutorGroupJUnitTest.java
@@ -34,7 +34,7 @@
    */
   @Test
   public void testVerifyGroupName() {
-    AbstractExecutor serialQueuedExecutorGroup = new SerialQueuedExecutorGroup(null);
+    AbstractExecutor serialQueuedExecutorGroup = new SerialQueuedExecutorGroup();
     assertTrue(serialQueuedExecutorGroup.getGroupName().equals(GROUPNAME));
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index c064afb..50cd1e7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -16,6 +16,7 @@
 
 import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
@@ -41,6 +42,8 @@
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.test.junit.categories.MembershipTest;
@@ -75,6 +78,8 @@
     CancelCriterion stopper = mock(CancelCriterion.class);
     SocketCloser socketCloser = mock(SocketCloser.class);
     TCPConduit tcpConduit = mock(TCPConduit.class);
+    ThreadsMonitoring threadMonitoring = mock(ThreadsMonitoring.class);
+    AbstractExecutor abstractExecutor = mock(AbstractExecutor.class);
 
     when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
     when(connectionTable.getConduit()).thenReturn(tcpConduit);
@@ -86,6 +91,8 @@
     when(tcpConduit.getDM()).thenReturn(distributionManager);
     when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 10337));
     when(tcpConduit.getStats()).thenReturn(dmStats);
+    when(distributionManager.getThreadMonitoring()).thenReturn(threadMonitoring);
+    when(threadMonitoring.createAbstractExecutor(any())).thenReturn(abstractExecutor);
 
     SocketChannel channel = SocketChannel.open();