TEZ-3726. Clean up DeletionTracker's reflection instantiation and provide ContainerLauncher with dagComplete() functionality (kshukla)
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index a2afb15..8924915 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3726. Clean up DeletionTracker's reflection instantiation and provide ContainerLauncher with dagComplete() functionality
   TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker classes.
   TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for tez_shuffle
   TEZ-3685. ShuffleHandler completedInputSet off-by-one error
diff --git a/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java
new file mode 100644
index 0000000..e3bd385
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+
+/**
+ * Plugin to allow custom container launchers to be written to launch containers that want to
+ * support cleanup of DAG level directories upon DAG completion in session mode. The directories are created by
+ * the Tez Shuffle Handler (tez_shuffle). A typical implementation of dagComplete() method would contain logic to send
+ * http request(s) for dag deletion to the nodes that support this auxiliary service.
+ */
+@Public
+@Unstable
+public abstract class DagContainerLauncher extends ContainerLauncher {
+
+  public DagContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+    super(containerLauncherContext);
+  }
+
+  public abstract void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager);
+}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
index c70ab10..8ecac14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.app.launcher;
 
+import org.apache.tez.common.DagContainerLauncher;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
@@ -41,11 +42,8 @@
   }
 
   public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
-    if (real instanceof TezContainerLauncherImpl) {
-      ((TezContainerLauncherImpl)real).dagComplete(dag, jobTokenSecretManager);
-    }
-    if (real instanceof LocalContainerLauncher) {
-      ((LocalContainerLauncher)real).dagComplete(dag, jobTokenSecretManager);
+    if (real instanceof DagContainerLauncher) {
+      ((DagContainerLauncher)real).dagComplete(dag, jobTokenSecretManager);
     }
   }
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
index b7583ae..52b6347 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.launcher;
 
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -38,12 +39,11 @@
 
 public class DeletionTrackerImpl extends DeletionTracker {
   private static final Logger LOG = LoggerFactory.getLogger(DeletionTrackerImpl.class);
-  private Map<NodeId, Integer> nodeIdShufflePortMap;
+  private Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId, Integer>();
   private ExecutorService dagCleanupService;
 
-  public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration conf) {
+  public DeletionTrackerImpl(Configuration conf) {
     super(conf);
-    this.nodeIdShufflePortMap = nodeIdShufflePortMap;
     this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT,
         TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10,
         TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 4793bd7..d50b49e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,7 +44,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.DagContainerLauncher;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.records.TezDAGID;
@@ -53,7 +53,6 @@
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -82,7 +81,7 @@
  * Since all (sub)tasks share the same local directory, they must be executed
  * sequentially in order to avoid creating/deleting the same files/dirs.
  */
-public class LocalContainerLauncher extends ContainerLauncher {
+public class LocalContainerLauncher extends DagContainerLauncher {
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
 
@@ -162,9 +161,7 @@
       String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
           TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
       deletionTracker = ReflectionUtils.createClazzInstance(
-          deletionTrackerClassName, new Class[]{
-              Map.class, Configuration.class},
-          new Object[]{new HashMap<NodeId, Integer>(), conf});
+          deletionTrackerClassName, new Class[]{Configuration.class}, new Object[]{conf});
     }
   }
 
@@ -408,6 +405,7 @@
     }
   }
 
+  @Override
   public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
     if (deletionTracker != null) {
       deletionTracker.dagComplete(dag, jobTokenSecretManager);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index 922575f..67fc4ed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -34,7 +34,7 @@
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.DagContainerLauncher;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -44,7 +44,6 @@
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
 import org.slf4j.Logger;
@@ -75,7 +74,7 @@
 /**
  * This class is responsible for launching of containers.
  */
-public class TezContainerLauncherImpl extends ContainerLauncher {
+public class TezContainerLauncherImpl extends DagContainerLauncher {
 
   // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
   static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
@@ -340,9 +339,7 @@
       String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
           TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
       deletionTracker = ReflectionUtils.createClazzInstance(
-          deletionTrackerClassName, new Class[]{
-              Map.class, Configuration.class},
-          new Object[]{new HashMap<NodeId, Integer>(), conf});
+          deletionTrackerClassName, new Class[]{Configuration.class}, new Object[]{conf});
     }
   }
 
@@ -444,6 +441,7 @@
     }
   }
 
+  @Override
   public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
     if (deletionTracker != null) {
       deletionTracker.dagComplete(dag, jobTokenSecretManager);