TEZ-4204: Data race in RootInputInitializerManager (Mustafa Iman via Ashutosh Chauhan)

Signed-off-by: Laszlo Bodor <bodorlaszlo0202@gmail.com>
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 5ce0050..9194c1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -25,6 +25,7 @@
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -33,6 +34,8 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
@@ -104,36 +107,65 @@
   }
 
 
-  public void runInputInitializers(
-          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
-
-    executor.submit(() -> createAndStartInitializing(inputs, pendingInitializerEvents));
+  public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs,
+                                   List<TezEvent> pendingInitializerEvents) {
+    List<InitializerWrapper> initWrappers = createInitializerWrappers(inputs);
+    if (!initWrappers.isEmpty()) {
+      executor.submit(() -> createAndStartInitializing(pendingInitializerEvents, initWrappers));
+    }
   }
 
-  private void createAndStartInitializing(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
+  /**
+   * Create input wrappers for all inputs in parallel.
+   *
+   * @param inputs
+   * @return
+   */
+  private List<InitializerWrapper> createInitializerWrappers(
+          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) {
     String current = null;
+    final List<InitializerWrapper> result = Collections.synchronizedList(new ArrayList<>());
     try {
-      List<InitializerWrapper> result = new ArrayList<>();
+      final List<Future<Void>> fResults = new ArrayList<>();
       for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> each : inputs) {
         current = each.getName();
-        InitializerWrapper initializer = createInitializerWrapper(each);
-        initializerMap.put(each.getName(), initializer);
-        registerPendingVertex(each, initializer);
-        result.add(initializer);
+        fResults.add(executor.submit(() -> {
+          InitializerWrapper initializer = createInitializerWrapper(each);
+          initializerMap.put(each.getName(), initializer);
+          registerPendingVertex(each, initializer);
+          result.add(initializer);
+          return null;
+        }));
       }
-      handleInitializerEvents(pendingInitializerEvents);
-      pendingInitializerEvents.clear();
-      for (InitializerWrapper inputWrapper : result) {
-        executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
+      for(Future<Void> f : fResults) {
+        f.get();
       }
-    } catch (Throwable t) {
-      VertexImpl vertexImpl = (VertexImpl) vertex;
-      String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(t);
-      LOG.info(msg);
-      vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
-      eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(), current,
-              new AMUserCodeException(AMUserCodeException.Source.InputInitializer, t)));
+    } catch (InterruptedException | ExecutionException t) {
+      failVertex(t, current);
+    }
+    return result;
+  }
 
+  void failVertex(Throwable t, String inputName) {
+    VertexImpl vertexImpl = (VertexImpl) vertex;
+    String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(t);
+    LOG.info(msg);
+    vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+    eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(), inputName,
+        new AMUserCodeException(AMUserCodeException.Source.InputInitializer, t)));
+  }
+
+  /**
+   * Start initializers in parallel.
+   *
+   * @param pendingInitializerEvents
+   * @param result
+   */
+  private void createAndStartInitializing(List<TezEvent> pendingInitializerEvents, List<InitializerWrapper> result) {
+    handleInitializerEvents(pendingInitializerEvents);
+    pendingInitializerEvents.clear();
+    for (InitializerWrapper inputWrapper : result) {
+      executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
     }
   }