GEODE-7312: modify the ThreadMonitor to print the stack of a blocking thread

Log a thread trace for a thread that's blocking a "stuck" thread.

This will help a lot when a user experiences hung operations.  Prior to
this change we needed to request thread dumps for servers and that was
usually not possible to obtain because the servers had already been
terminated & restarted.  This change puts the relevant thread dumps in
the server's log file, which is much easier for folks to gather after
the fact.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
index 8f98926..0de7fa6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
@@ -28,6 +28,7 @@
 
   private static final int THREAD_DUMP_DEPTH = 40;
   private static final Logger logger = LogService.getLogger();
+  public static final String LOCK_OWNER_THREAD_STACK = "Lock owner thread stack";
   private long threadID;
   private String groupName;
   private short numIterationsStuck;
@@ -47,10 +48,10 @@
 
   public void handleExpiry(long stuckTime) {
     this.incNumIterationsStuck();
-    logger.warn(handleLogMessage(stuckTime));
+    logger.warn(createThreadReport(stuckTime));
   }
 
-  private String handleLogMessage(long stuckTime) {
+  String createThreadReport(long stuckTime) {
 
     DateFormat dateFormat = new SimpleDateFormat("dd MMM yyyy HH:mm:ss zzz");
 
@@ -58,42 +59,57 @@
         ManagementFactory.getThreadMXBean().getThreadInfo(this.threadID, THREAD_DUMP_DEPTH);
     boolean logThreadDetails = (thread != null);
 
-    StringBuilder strb = new StringBuilder();
+    StringBuilder stringBuilder = new StringBuilder();
+    final String lineSeparator = System.lineSeparator();
 
-    strb.append("Thread <").append(this.threadID).append("> (0x")
+    stringBuilder.append("Thread <").append(this.threadID).append("> (0x")
         .append(Long.toHexString(this.threadID)).append(") that was executed at <")
         .append(dateFormat.format(this.getStartTime())).append("> has been stuck for <")
         .append((float) stuckTime / 1000)
         .append(" seconds> and number of thread monitor iteration <")
-        .append(this.numIterationsStuck).append("> ").append(System.lineSeparator());
+        .append(this.numIterationsStuck).append("> ").append(lineSeparator);
     if (logThreadDetails) {
-      strb.append("Thread Name <").append(thread.getThreadName()).append(">")
+      stringBuilder.append("Thread Name <").append(thread.getThreadName()).append(">")
           .append(" state <").append(thread.getThreadState())
-          .append(">").append(System.lineSeparator());
+          .append(">").append(lineSeparator);
 
       if (thread.getLockName() != null)
-        strb.append("Waiting on <").append(thread.getLockName()).append(">")
-            .append(System.lineSeparator());
+        stringBuilder.append("Waiting on <").append(thread.getLockName()).append(">")
+            .append(lineSeparator);
 
       if (thread.getLockOwnerName() != null)
-        strb.append("Owned By <").append(thread.getLockOwnerName()).append("> with ID <")
-            .append(thread.getLockOwnerId()).append(">").append(System.lineSeparator());
+        stringBuilder.append("Owned By <").append(thread.getLockOwnerName()).append("> with ID <")
+            .append(thread.getLockOwnerId()).append(">").append(lineSeparator);
     }
 
 
-    strb.append("Executor Group <").append(groupName).append(">").append(System.lineSeparator())
+    stringBuilder.append("Executor Group <").append(groupName).append(">").append(
+        lineSeparator)
         .append("Monitored metric <ResourceManagerStats.numThreadsStuck>")
-        .append(System.lineSeparator());
+        .append(lineSeparator);
 
     if (logThreadDetails) {
-      strb.append("Thread Stack:").append(System.lineSeparator());
-      for (int i = 0; i < thread.getStackTrace().length; i++) {
-        String row = thread.getStackTrace()[i].toString();
-        strb.append(row).append(System.lineSeparator());
+      writeThreadStack(thread, "Thread stack:", stringBuilder);
+    }
+
+    if (logThreadDetails && thread.getLockOwnerName() != null) {
+      ThreadInfo lockOwnerThread = ManagementFactory.getThreadMXBean()
+          .getThreadInfo(thread.getLockOwnerId(), THREAD_DUMP_DEPTH);
+      if (lockOwnerThread != null) {
+        writeThreadStack(lockOwnerThread, LOCK_OWNER_THREAD_STACK, stringBuilder);
       }
     }
 
-    return strb.toString();
+    return stringBuilder.toString();
+  }
+
+  private void writeThreadStack(ThreadInfo thread, String header, StringBuilder strb) {
+    final String lineSeparator = System.lineSeparator();
+    strb.append(header).append(lineSeparator);
+    for (int i = 0; i < thread.getStackTrace().length; i++) {
+      String row = thread.getStackTrace()[i].toString();
+      strb.append(row).append(lineSeparator);
+    }
   }
 
   public long getStartTime() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java
index de57c32..ece6c95 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/AbstractExecutorGroupJUnitTest.java
@@ -18,8 +18,10 @@
 
 import org.junit.Test;
 
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
 /**
- * Contains simple tests for the {@link ThreadMonitoringUtils}.
+ * Contains simple tests for the {@link AbstractExecutor}.
  *
  *
  * @since Geode 1.5
@@ -29,6 +31,8 @@
   private final AbstractExecutor abstractExecutorGroup =
       new FunctionExecutionPooledExecutorGroup(null);
 
+  private static final long timeoutInMilliseconds = GeodeAwaitility.getTimeout().getValueInMS();
+
   @Test
   public void testInitializationValues() {
     assertTrue(abstractExecutorGroup.getStartTime() <= System.currentTimeMillis());
@@ -41,4 +45,63 @@
     abstractExecutorGroup.handleExpiry(12);
     assertTrue(abstractExecutorGroup.getNumIterationsStuck() == 1);
   }
+
+  /**
+   * If a thread is blocked by another thread we want to see the other thread's
+   * stack in a "stuck thread" report. This test creates such a thread and
+   * generates a "stuck thread" report to make sure the report on the other thread
+   * is included.
+   */
+  @Test
+  public void lockOwnerThreadStackIsReported() throws InterruptedException {
+    final Object syncObject = new Object();
+    final Object releaseObject = new Object();
+    final boolean[] blockingThreadWaiting = new boolean[1];
+    final boolean[] blockedThreadWaiting = new boolean[1];
+    Thread blockingThread = new Thread("blocking thread") {
+      public void run() {
+        synchronized (syncObject) {
+          synchronized (releaseObject) {
+            try {
+              blockingThreadWaiting[0] = true;
+              releaseObject.wait(timeoutInMilliseconds);
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+        }
+      }
+    };
+    Thread blockedThread = new Thread("blocked thread") {
+      public void run() {
+        blockedThreadWaiting[0] = true;
+        synchronized (syncObject) {
+          try {
+            syncObject.wait(timeoutInMilliseconds);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      }
+    };
+    blockingThread.start();
+    GeodeAwaitility.await().until(() -> blockingThreadWaiting[0]);
+    blockedThread.start();
+    GeodeAwaitility.await().until(() -> blockedThreadWaiting[0]);
+    try {
+      AbstractExecutor executor = new AbstractExecutor(null, blockedThread.getId()) {
+        @Override
+        public void handleExpiry(long stuckTime) {
+          // no-op
+        }
+      };
+      String threadReport = executor.createThreadReport(60000);
+      assertTrue(threadReport.contains(AbstractExecutor.LOCK_OWNER_THREAD_STACK));
+    } finally {
+      blockingThread.interrupt();
+      blockedThread.interrupt();
+      blockingThread.join(timeoutInMilliseconds);
+      blockedThread.join(timeoutInMilliseconds);
+    }
+  }
 }