YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 79ad464..f9deab0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -20,11 +20,11 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,11 +268,16 @@
}
class GenericEventHandler implements EventHandler<Event> {
- private void printEventQueueDetails(BlockingQueue<Event> queue) {
- Map<Enum, Long> counterMap = eventQueue.stream().
- collect(Collectors.
- groupingBy(e -> e.getType(), Collectors.counting())
- );
+ private void printEventQueueDetails() {
+ Iterator<Event> iterator = eventQueue.iterator();
+ Map<Enum, Long> counterMap = new HashMap<>();
+ while (iterator.hasNext()) {
+ Enum eventType = iterator.next().getType();
+ if (!counterMap.containsKey(eventType)) {
+ counterMap.put(eventType, 0L);
+ }
+ counterMap.put(eventType, counterMap.get(eventType) + 1);
+ }
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
long num = entry.getValue();
LOG.info("Event type: " + entry.getKey()
@@ -295,7 +300,7 @@
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
- printEventQueueDetails(eventQueue);
+ printEventQueueDetails();
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
index 762e228..55ddd12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -97,12 +97,23 @@
}
private static class TestHandler implements EventHandler<Event> {
+
+ private long sleepTime = 1500;
+
+ TestHandler() {
+ }
+
+ TestHandler(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
@Override
public void handle(Event event) {
try {
// As long as 10000 events queued
- Thread.sleep(1500);
- } catch (InterruptedException e) {}
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ }
}
}
@@ -170,11 +181,54 @@
//Make sure more than one event to take
verify(log, atLeastOnce()).
info("Latest dispatch event type: TestEventType");
- dispatcher.stop();
} finally {
//... restore logger object
logger.set(null, oldLog);
+ dispatcher.stop();
}
}
+
+ //Test print dispatcher details when the blocking queue is heavy
+ @Test(timeout = 60000)
+ public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ testPrintDispatcherEventDetailsAvoidDeadLoopInternal();
+ }
+ }
+
+ public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal()
+ throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.
+ YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10);
+ Logger log = mock(Logger.class);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+
+ Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
+ logger.setAccessible(true);
+ Field modifiers = Field.class.getDeclaredField("modifiers");
+ modifiers.setAccessible(true);
+ modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
+ Object oldLog = logger.get(null);
+
+ try {
+ logger.set(null, log);
+ dispatcher.register(TestEnum.class, new TestHandler(0));
+ dispatcher.start();
+
+ for (int i = 0; i < 10000; ++i) {
+ Event event = mock(Event.class);
+ when(event.getType()).thenReturn(TestEnum.TestEventType);
+ dispatcher.getEventHandler().handle(event);
+ }
+ Thread.sleep(3000);
+ } finally {
+ //... restore logger object
+ logger.set(null, oldLog);
+ dispatcher.stop();
+ }
+ }
+
}