YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 79ad464..f9deab0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -20,11 +20,11 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -268,11 +268,16 @@
   }
 
   class GenericEventHandler implements EventHandler<Event> {
-    private void printEventQueueDetails(BlockingQueue<Event> queue) {
-      Map<Enum, Long> counterMap = eventQueue.stream().
-              collect(Collectors.
-                      groupingBy(e -> e.getType(), Collectors.counting())
-              );
+    private void printEventQueueDetails() {
+      Iterator<Event> iterator = eventQueue.iterator();
+      Map<Enum, Long> counterMap = new HashMap<>();
+      while (iterator.hasNext()) {
+        Enum eventType = iterator.next().getType();
+        if (!counterMap.containsKey(eventType)) {
+          counterMap.put(eventType, 0L);
+        }
+        counterMap.put(eventType, counterMap.get(eventType) + 1);
+      }
       for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
         long num = entry.getValue();
         LOG.info("Event type: " + entry.getKey()
@@ -295,7 +300,7 @@
       if (qSize != 0 && qSize % detailsInterval == 0
               && lastEventDetailsQueueSizeLogged != qSize) {
         lastEventDetailsQueueSizeLogged = qSize;
-        printEventQueueDetails(eventQueue);
+        printEventQueueDetails();
         printTrigger = true;
       }
       int remCapacity = eventQueue.remainingCapacity();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
index 762e228..55ddd12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -97,12 +97,23 @@
   }
 
   private static class TestHandler implements EventHandler<Event> {
+
+    private long sleepTime = 1500;
+
+    TestHandler() {
+    }
+
+    TestHandler(long sleepTime) {
+      this.sleepTime = sleepTime;
+    }
+
     @Override
     public void handle(Event event) {
       try {
         // As long as 10000 events queued
-        Thread.sleep(1500);
-      } catch (InterruptedException e) {}
+        Thread.sleep(this.sleepTime);
+      } catch (InterruptedException e) {
+      }
     }
   }
 
@@ -170,11 +181,54 @@
       //Make sure more than one event to take
       verify(log, atLeastOnce()).
               info("Latest dispatch event type: TestEventType");
-      dispatcher.stop();
     } finally {
       //... restore logger object
       logger.set(null, oldLog);
+      dispatcher.stop();
     }
   }
+
+  //Test print dispatcher details when the blocking queue is heavy
+  @Test(timeout = 60000)
+  public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception {
+    for (int i = 0; i < 5; i++) {
+      testPrintDispatcherEventDetailsAvoidDeadLoopInternal();
+    }
+  }
+
+  public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal()
+      throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.
+        YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10);
+    Logger log = mock(Logger.class);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+
+    Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
+    logger.setAccessible(true);
+    Field modifiers = Field.class.getDeclaredField("modifiers");
+    modifiers.setAccessible(true);
+    modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
+    Object oldLog = logger.get(null);
+
+    try {
+      logger.set(null, log);
+      dispatcher.register(TestEnum.class, new TestHandler(0));
+      dispatcher.start();
+
+      for (int i = 0; i < 10000; ++i) {
+        Event event = mock(Event.class);
+        when(event.getType()).thenReturn(TestEnum.TestEventType);
+        dispatcher.getEventHandler().handle(event);
+      }
+      Thread.sleep(3000);
+    } finally {
+      //... restore logger object
+      logger.set(null, oldLog);
+      dispatcher.stop();
+    }
+  }
+
 }