TEZ-4204: Data race in RootInputInitializerManager (Mustafa Iman via Ashutosh Chauhan)
Signed-off-by: Laszlo Bodor <bodorlaszlo0202@gmail.com>
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 5ce0050..9194c1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -25,6 +25,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -33,6 +34,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -104,36 +107,65 @@
}
- public void runInputInitializers(
- List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
-
- executor.submit(() -> createAndStartInitializing(inputs, pendingInitializerEvents));
+ public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs,
+ List<TezEvent> pendingInitializerEvents) {
+ List<InitializerWrapper> initWrappers = createInitializerWrappers(inputs);
+ if (!initWrappers.isEmpty()) {
+ executor.submit(() -> createAndStartInitializing(pendingInitializerEvents, initWrappers));
+ }
}
- private void createAndStartInitializing(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs, List<TezEvent> pendingInitializerEvents) {
+ /**
+ * Create input wrappers for all inputs in parallel.
+ *
+ * @param inputs
+ * @return
+ */
+ private List<InitializerWrapper> createInitializerWrappers(
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) {
String current = null;
+ final List<InitializerWrapper> result = Collections.synchronizedList(new ArrayList<>());
try {
- List<InitializerWrapper> result = new ArrayList<>();
+ final List<Future<Void>> fResults = new ArrayList<>();
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> each : inputs) {
current = each.getName();
- InitializerWrapper initializer = createInitializerWrapper(each);
- initializerMap.put(each.getName(), initializer);
- registerPendingVertex(each, initializer);
- result.add(initializer);
+ fResults.add(executor.submit(() -> {
+ InitializerWrapper initializer = createInitializerWrapper(each);
+ initializerMap.put(each.getName(), initializer);
+ registerPendingVertex(each, initializer);
+ result.add(initializer);
+ return null;
+ }));
}
- handleInitializerEvents(pendingInitializerEvents);
- pendingInitializerEvents.clear();
- for (InitializerWrapper inputWrapper : result) {
- executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
+ for(Future<Void> f : fResults) {
+ f.get();
}
- } catch (Throwable t) {
- VertexImpl vertexImpl = (VertexImpl) vertex;
- String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(t);
- LOG.info(msg);
- vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
- eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(), current,
- new AMUserCodeException(AMUserCodeException.Source.InputInitializer, t)));
+ } catch (InterruptedException | ExecutionException t) {
+ failVertex(t, current);
+ }
+ return result;
+ }
+ void failVertex(Throwable t, String inputName) {
+ VertexImpl vertexImpl = (VertexImpl) vertex;
+ String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(t);
+ LOG.info(msg);
+ vertexImpl.finished(FAILED, VertexTerminationCause.INIT_FAILURE, msg);
+ eventHandler.handle(new VertexEventRootInputFailed(vertex.getVertexId(), inputName,
+ new AMUserCodeException(AMUserCodeException.Source.InputInitializer, t)));
+ }
+
+ /**
+ * Start initializers in parallel.
+ *
+ * @param pendingInitializerEvents
+ * @param result
+ */
+ private void createAndStartInitializing(List<TezEvent> pendingInitializerEvents, List<InitializerWrapper> result) {
+ handleInitializerEvents(pendingInitializerEvents);
+ pendingInitializerEvents.clear();
+ for (InitializerWrapper inputWrapper : result) {
+ executor.submit(() -> runInitializerAndProcessResult(inputWrapper));
}
}