[REEF-1656] Implement proper logging of threads still running at the end of the REEF process

This work is towards proper job shutdown for "REEF as a library"
[REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) project

  * Use `ThreadLogger` for all thread logging and remove duplicate code
  * sort threads in log by name and thread group
  * Print thread group, status, activity, and other info
  * minor fixes in logging

JIRA:
  [REEF-1656](https://issues.apache.org/jira/browse/REEF-1656)

Pull Request:
  This closes #1171
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
index b997347..ceeb4d1 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
@@ -31,6 +31,7 @@
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.util.EnvironmentUtils;
 import org.apache.reef.util.REEFVersion;
+import org.apache.reef.util.ThreadLogger;
 import org.apache.reef.wake.profiler.WakeProfiler;
 import org.apache.reef.wake.time.Clock;
 
@@ -53,6 +54,8 @@
 
   private static final Logger LOG = Logger.getLogger(REEFEnvironment.class.getName());
 
+  private static final String CLASS_NAME = REEFEnvironment.class.getCanonicalName();
+
   private static final Tang TANG = Tang.Factory.getTang();
 
   /** Main event loop of current REEF component (Driver or Evaluator). */
@@ -131,7 +134,7 @@
   @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
   public void close() {
 
-    LOG.log(Level.FINER, "Closing REEF Environment - start");
+    LOG.entering(CLASS_NAME, "close");
 
     try {
       this.clock.close();
@@ -150,7 +153,9 @@
       }
     }
 
-    LOG.log(Level.FINER, "Closing REEF Environment - end");
+    ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after REEFEnvironment.close():");
+
+    LOG.exiting(CLASS_NAME, "close");
   }
 
   /**
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
index 74de099..0b542e3 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -185,15 +185,9 @@
 
     LOG.log(Level.INFO, "Exiting REEFLauncher.main()");
 
-    if (LOG.isLoggable(Level.FINEST)) {
-      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after REEFLauncher.close():"));
-    }
-
     System.exit(0);
 
-    if (LOG.isLoggable(Level.FINEST)) {
-      LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():"));
-    }
+    ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after System.exit():");
   }
 
   /**
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java
index 104b468..1e4a790 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java
@@ -22,6 +22,7 @@
 import java.lang.management.MonitorInfo;
 import java.lang.management.ThreadInfo;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -56,7 +57,10 @@
   public static void logThreads(
       final Logger logger, final Level level, final String prefix,
       final String threadPrefix, final String stackElementPrefix) {
-    logger.log(level, getFormattedThreadList(prefix, threadPrefix, stackElementPrefix));
+
+    if (logger.isLoggable(level)) {
+      logger.log(level, getFormattedThreadList(prefix, threadPrefix, stackElementPrefix));
+    }
   }
 
   /**
@@ -69,13 +73,31 @@
    */
   public static String getFormattedThreadList(
       final String prefix, final String threadPrefix, final String stackElementPrefix) {
-    final StringBuilder message = new StringBuilder(prefix);
+
+    // Sort by thread name
+    final TreeMap<String, StackTraceElement[]> threadNames = new TreeMap<>();
     for (final Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
-      message.append(threadPrefix).append("Thread '").append(entry.getKey().getName()).append("':");
-      for (final StackTraceElement element : entry.getValue()) {
-        message.append(stackElementPrefix).append(element.toString());
+
+      final Thread t = entry.getKey();
+      final String tg = t.getThreadGroup() == null ? null : t.getThreadGroup().getName();
+
+      if (!"system".equals(tg)) {
+        threadNames.put(String.format("TG %s THREAD %s :: %s, %s, %s, %s",
+            tg, t.getName(), t.getState(),
+            t.isAlive() ? "Alive" : "NOT alive",
+            t.isInterrupted() ? "Interrupted" : "NOT interrupted",
+            t.isDaemon() ? "Daemon" : "NOT daemon"), entry.getValue());
       }
     }
+
+    final StringBuilder message = new StringBuilder(prefix);
+    for (final Map.Entry<String, StackTraceElement[]> entry : threadNames.entrySet()) {
+      message.append(threadPrefix).append(entry.getKey());
+      for (final StackTraceElement element : entry.getValue()) {
+        message.append(stackElementPrefix).append(element);
+      }
+    }
+
     return message.toString();
   }
 
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index 523bce3..238e7bb 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -212,7 +212,7 @@
     synchronized (this.schedule) {
 
       if (this.isClosed) {
-        LOG.log(Level.FINEST, "Clock has already been closed");
+        LOG.exiting(CLASS_NAME, "close", "Clock has already been closed");
         return;
       }
 
@@ -270,24 +270,6 @@
   }
 
   /**
-   * Logs the currently running threads.
-   * @param level Log level used to write the entry.
-   * @param prefix put before the comma-separated list of threads
-   */
-  private static void logThreads(final Level level, final String prefix) {
-
-    if (LOG.isLoggable(level)) {
-
-      final StringBuilder sb = new StringBuilder(prefix);
-      for (final Thread t : Thread.getAllStackTraces().keySet()) {
-        sb.append(t.getName()).append(", ");
-      }
-
-      LOG.log(level, sb.toString());
-    }
-  }
-
-  /**
    * Main event loop.
    * Set up the event handlers, and go into event loop that polls the schedule and process events in it.
    */
@@ -382,8 +364,6 @@
       this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));
 
     } finally {
-
-      logThreads(Level.FINE, "Threads running after exiting the clock main loop: ");
       LOG.log(Level.FINE, "Runtime clock exit");
     }