TEZ-3726. Clean up DeletionTracker's reflection instantiation and provide ContainerLauncher with dagComplete() functionality (kshukla)
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index a2afb15..8924915 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@
INCOMPATIBLE CHANGES:
ALL CHANGES:
+ TEZ-3726. Clean up DeletionTracker's reflection instantiation and provide ContainerLauncher with dagComplete() functionality
TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker classes.
TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for tez_shuffle
TEZ-3685. ShuffleHandler completedInputSet off-by-one error
diff --git a/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java
new file mode 100644
index 0000000..e3bd385
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+
+/**
+ * Plugin to allow custom container launchers to be written to launch containers that want to
+ * support cleanup of DAG level directories upon DAG completion in session mode. The directories are created by
+ * the Tez Shuffle Handler (tez_shuffle). A typical implementation of dagComplete() method would contain logic to send
+ * http request(s) for dag deletion to the nodes that support this auxiliary service.
+ */
+@Public
+@Unstable
+public abstract class DagContainerLauncher extends ContainerLauncher {
+
+ public DagContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+ super(containerLauncherContext);
+ }
+
+ public abstract void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager);
+}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
index c70ab10..8ecac14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.app.launcher;
+import org.apache.tez.common.DagContainerLauncher;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
@@ -41,11 +42,8 @@
}
public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
- if (real instanceof TezContainerLauncherImpl) {
- ((TezContainerLauncherImpl)real).dagComplete(dag, jobTokenSecretManager);
- }
- if (real instanceof LocalContainerLauncher) {
- ((LocalContainerLauncher)real).dagComplete(dag, jobTokenSecretManager);
+ if (real instanceof DagContainerLauncher) {
+ ((DagContainerLauncher)real).dagComplete(dag, jobTokenSecretManager);
}
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
index b7583ae..52b6347 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.launcher;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -38,12 +39,11 @@
public class DeletionTrackerImpl extends DeletionTracker {
private static final Logger LOG = LoggerFactory.getLogger(DeletionTrackerImpl.class);
- private Map<NodeId, Integer> nodeIdShufflePortMap;
+ private Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId, Integer>();
private ExecutorService dagCleanupService;
- public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration conf) {
+ public DeletionTrackerImpl(Configuration conf) {
super(conf);
- this.nodeIdShufflePortMap = nodeIdShufflePortMap;
this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT,
TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 4793bd7..d50b49e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,7 +44,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.DagContainerLauncher;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.records.TezDAGID;
@@ -53,7 +53,6 @@
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -82,7 +81,7 @@
* Since all (sub)tasks share the same local directory, they must be executed
* sequentially in order to avoid creating/deleting the same files/dirs.
*/
-public class LocalContainerLauncher extends ContainerLauncher {
+public class LocalContainerLauncher extends DagContainerLauncher {
private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
@@ -162,9 +161,7 @@
String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
deletionTracker = ReflectionUtils.createClazzInstance(
- deletionTrackerClassName, new Class[]{
- Map.class, Configuration.class},
- new Object[]{new HashMap<NodeId, Integer>(), conf});
+ deletionTrackerClassName, new Class[]{Configuration.class}, new Object[]{conf});
}
}
@@ -408,6 +405,7 @@
}
}
+ @Override
public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
if (deletionTracker != null) {
deletionTracker.dagComplete(dag, jobTokenSecretManager);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index 922575f..67fc4ed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -34,7 +34,7 @@
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.DagContainerLauncher;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -44,7 +44,6 @@
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.slf4j.Logger;
@@ -75,7 +74,7 @@
/**
* This class is responsible for launching of containers.
*/
-public class TezContainerLauncherImpl extends ContainerLauncher {
+public class TezContainerLauncherImpl extends DagContainerLauncher {
// TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
@@ -340,9 +339,7 @@
String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
deletionTracker = ReflectionUtils.createClazzInstance(
- deletionTrackerClassName, new Class[]{
- Map.class, Configuration.class},
- new Object[]{new HashMap<NodeId, Integer>(), conf});
+ deletionTrackerClassName, new Class[]{Configuration.class}, new Object[]{conf});
}
}
@@ -444,6 +441,7 @@
}
}
+ @Override
public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
if (deletionTracker != null) {
deletionTracker.dagComplete(dag, jobTokenSecretManager);