MAPREDUCE-1523. Making Mumak work with Capacity-Scheduler (Anirban Das via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@984533 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 89a924d..45371d1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -112,6 +112,9 @@
TestTrackerDistributedCacheManagerWithLinuxTaskController. (Devaraj Das
via amareshwari)
+ MAPREDUCE-1523. Making Mumak work with Capacity-Scheduler (Anirban Das
+ via mahadev)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java b/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
index 23389ce..e2178fe 100644
--- a/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
+++ b/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
@@ -634,4 +634,12 @@
Set<JobID> getInitializedJobList() {
return initializedJobs.keySet();
}
+
+ public HashMap<String, JobInitializationThread> getThreadsToQueueMap() {
+ return threadsToQueueMap;
+ }
+
+ public long getSleepInterval(){
+ return sleepInterval;
+ }
}
diff --git a/src/contrib/mumak/bin/mumak.sh b/src/contrib/mumak/bin/mumak.sh
index e657d4a..ed1ecf9 100644
--- a/src/contrib/mumak/bin/mumak.sh
+++ b/src/contrib/mumak/bin/mumak.sh
@@ -172,7 +172,7 @@
echo "Usage: $script [--config dir] trace.json topology.json"
}
-if [ $# != 2 ]; then
+if [ $# <= 2 ]; then
print_usage
exit
fi
diff --git a/src/contrib/mumak/build.xml b/src/contrib/mumak/build.xml
index 5fecedd..2e0e344 100644
--- a/src/contrib/mumak/build.xml
+++ b/src/contrib/mumak/build.xml
@@ -32,6 +32,11 @@
<import file="../build-contrib.xml"/>
<property name="mumak.stamp.file" value="${build.dir}/mumak.uptodate.stamp"/>
+ <path id="mumak-classpath">
+ <pathelement location="${hadoop.root}/build/contrib/capacity-scheduler/classes"/>
+ <path refid="contrib-classpath"/>
+ </path>
+
<target name="compile-java-sources" depends="init, ivy-retrieve-common" unless="skip.contrib">
<echo message="contrib: ${name}"/>
<javac
@@ -41,7 +46,7 @@
destdir="${build.classes}"
debug="${javac.debug}"
deprecation="${javac.deprecation}">
- <classpath refid="contrib-classpath"/>
+ <classpath refid="mumak-classpath"/>
</javac>
</target>
@@ -59,7 +64,7 @@
<echo message="Start weaving aspects in place"/>
<iajc
encoding="${build.encoding}"
- srcdir="${hadoop.root}/src/java/;${hadoop.root}/build/src/;${src.dir}"
+ srcdir="${hadoop.root}/src/java/;${hadoop.root}/src/contrib/capacity-scheduler/src/java/;${hadoop.root}/build/src/;${src.dir}"
includes="org/apache/hadoop/**/*.java, org/apache/hadoop/**/*.aj"
destDir="${build.classes}"
debug="${javac.debug}"
@@ -67,7 +72,7 @@
source="${javac.version}"
fork="yes"
deprecation="${javac.deprecation}">
- <classpath refid="contrib-classpath"/>
+ <classpath refid="mumak-classpath"/>
</iajc>
<touch file="${mumak.stamp.file}" mkdirs="true" />
<echo message="Weaving of aspects is finished"/>
diff --git a/src/contrib/mumak/conf/mumak.xml b/src/contrib/mumak/conf/mumak.xml
index 55b07e9..ac96840 100644
--- a/src/contrib/mumak/conf/mumak.xml
+++ b/src/contrib/mumak/conf/mumak.xml
@@ -34,4 +34,11 @@
node-local</description>
</property>
+<property>
+ <name>mumak.job-submission.policy</name>
+ <value>REPLAY</value>
+ <description>Job submission policy
+ </description>
+</property>
+
</configuration>
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobInitializationPollerAspects.aj b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobInitializationPollerAspects.aj
new file mode 100644
index 0000000..57cb76f
--- /dev/null
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobInitializationPollerAspects.aj
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * This aspect is used for NOT starting the job initialization threads of the capacity scheduler.
+ * We schedule the body of these threads manually from the SimulatorJobTracker according to
+ * simulation time.
+ */
+
+public aspect JobInitializationPollerAspects {
+
+ pointcut overrideInitializationThreadStarts () :
+ (target (JobInitializationPoller) ||
+ target (JobInitializationPoller.JobInitializationThread)) &&
+ call (public void start());
+
+ void around() : overrideInitializationThreadStarts () {
+ // no-op
+ }
+
+
+}
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorCSJobInitializationThread.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorCSJobInitializationThread.java
new file mode 100644
index 0000000..c758486
--- /dev/null
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorCSJobInitializationThread.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobInitializationPoller.JobInitializationThread;
+
+public class SimulatorCSJobInitializationThread implements SimulatorEventListener {
+
+ long lastCalled;
+ CapacityTaskScheduler taskScheduler;
+ JobInitializationPoller jobPoller;
+ private final String queue;
+ final long sleepInterval;
+ /** The log object to send our messages to; only used for debugging. */
+ private static final Log LOG = LogFactory.getLog(SimulatorCSJobInitializationThread.class);
+
+ public SimulatorCSJobInitializationThread(TaskScheduler taskScheduler,
+ String queue) {
+ this.taskScheduler = (CapacityTaskScheduler) taskScheduler;
+ jobPoller = this.taskScheduler.getInitializationPoller();
+ sleepInterval = jobPoller.getSleepInterval();
+ this.queue = queue;
+ }
+
+ @Override
+ public List<SimulatorEvent> accept(SimulatorEvent event) throws IOException {
+
+ SimulatorThreadWakeUpEvent e;
+ if(event instanceof SimulatorThreadWakeUpEvent) {
+ e = (SimulatorThreadWakeUpEvent) event;
+ }
+ else {
+ throw new IOException("Received an unexpected type of event in " +
+ "SimThrdCapSchedJobInit");
+ }
+ jobPoller.cleanUpInitializedJobsList();
+ jobPoller.selectJobsToInitialize();
+ JobInitializationThread thread =
+ jobPoller.getThreadsToQueueMap().get(this.queue);
+ thread.initializeJobs();
+ lastCalled = e.getTimeStamp();
+ List<SimulatorEvent> returnEvents =
+ Collections.<SimulatorEvent>singletonList(
+ new SimulatorThreadWakeUpEvent(this,
+ lastCalled + sleepInterval));
+ return returnEvents;
+ }
+
+ @Override
+ public List<SimulatorEvent> init(long when) throws IOException {
+
+ return Collections.<SimulatorEvent>singletonList(
+ new SimulatorThreadWakeUpEvent(this,
+ when + sleepInterval));
+ }
+
+}
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
index db286c9..7a295ae 100644
--- a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
@@ -21,10 +21,15 @@
import java.io.PrintStream;
import java.util.List;
import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
import java.util.Iterator;
import java.util.Random;
import java.util.regex.Pattern;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -68,6 +73,13 @@
boolean shutdown = false;
long terminateTime = Long.MAX_VALUE;
long currentTime;
+ /** The HashSet for storing all the simulated threads useful for
+ * job initialization for capacity scheduler.
+ */
+ HashSet<SimulatorCSJobInitializationThread> threadSet;
+ /** The log object to send our messages to; only used for debugging. */
+ private static final Log LOG = LogFactory.getLog(SimulatorEngine.class);
+
/**
* Master random seed read from the configuration file, if present.
* It is (only) used for creating sub seeds for all the random number
@@ -137,13 +149,13 @@
}
/**
- * Initiate components in the simulation.
- * @throws InterruptedException
- * @throws IOException if trace or topology files cannot be open
+ * Creates the configuration for mumak simulation. This is kept modular mostly for
+ * testing purposes. so that the standard configuration can be modified before passing
+ * it to the init() function.
+ * @return JobConf: the configuration for the SimulatorJobTracker
*/
- @SuppressWarnings("deprecation")
- void init() throws InterruptedException, IOException {
+ JobConf createMumakConf() {
JobConf jobConf = new JobConf(getConf());
jobConf.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
@@ -157,7 +169,30 @@
jobConf.setUser("mumak");
jobConf.set("mapred.system.dir",
jobConf.get("hadoop.log.dir", "/tmp/hadoop-"+jobConf.getUser()) + "/mapred/system");
- jobConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class.getName());
+
+ return jobConf;
+ }
+
+ /**
+ * Initialize components in the simulation.
+ * @throws InterruptedException
+ * @throws IOException if trace or topology files cannot be opened.
+ */
+ void init() throws InterruptedException, IOException {
+
+ JobConf jobConf = createMumakConf();
+ init(jobConf);
+ }
+
+ /**
+ * Initiate components in the simulation. The JobConf is
+ * create separately and passed to the init().
+ * @param JobConf: The configuration for the jobtracker.
+ * @throws InterruptedException
+ * @throws IOException if trace or topology files cannot be opened.
+ */
+ @SuppressWarnings("deprecation")
+ void init(JobConf jobConf) throws InterruptedException, IOException {
FileSystem lfs = FileSystem.getLocal(getConf());
Path logPath =
@@ -212,11 +247,40 @@
jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy);
queue.addAll(jc.init(firstJobStartTime));
+ //if the taskScheduler is CapacityTaskScheduler start off the JobInitialization
+ //threads too
+ if (jobConf.get("mapred.jobtracker.taskScheduler").equals
+ (CapacityTaskScheduler.class.getName())) {
+ LOG.info("CapacityScheduler used: starting simulatorThreads");
+ startSimulatorThreadsCapSched(now);
+ }
terminateTime = getTimeProperty(jobConf, "mumak.terminate.time",
Long.MAX_VALUE);
}
/**
+ * In this function, we collect the set of leaf queues from JobTracker, and
+ * for each of them creates a simulated thread that performs the same
+ * check as JobInitializationPoller.JobInitializationThread in Capacity Scheduler.
+ * @param now
+ * @throws IOException
+ */
+ private void startSimulatorThreadsCapSched(long now) throws IOException {
+
+ Set<String> queueNames = jt.getQueueManager().getLeafQueueNames();
+ TaskScheduler taskScheduler = jt.getTaskScheduler();
+ threadSet = new HashSet<SimulatorCSJobInitializationThread>();
+ // We create a different thread for each queue and hold a
+ //reference to each of them
+ for (String jobQueue: queueNames) {
+ SimulatorCSJobInitializationThread capThread =
+ new SimulatorCSJobInitializationThread(taskScheduler,jobQueue);
+ threadSet.add(capThread);
+ queue.addAll(capThread.init(now));
+ }
+ }
+
+ /**
* The main loop of the simulation. First call init() to get objects ready,
* then go into the main loop, where {@link SimulatorEvent}s are handled removed from
* the {@link SimulatorEventQueue}, and new {@link SimulatorEvent}s are created and inserted
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
index 1b02930..f640056 100644
--- a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
@@ -197,6 +197,24 @@
SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, jobSubmitDir, this,
this.conf,
jobStory);
+ // Check whether the queue information provided is valid
+ try {
+ checkQueueValidity(job);
+ } catch(IOException ioe) {
+ LOG.warn("Queue given for job " + job.getJobID() + " is not valid:"
+ + ioe);
+ throw ioe;
+ }
+
+ // Check the job if it cannot run in the cluster because of invalid memory
+ // requirements.
+ try {
+ checkMemoryRequirements(job);
+ } catch (IOException ioe) {
+ LOG.warn("Exception in checking Memory requirements of jobId: " + jobId
+ + ioe);
+ //throw ioe;
+ }
return addJob(jobId, job);
}
@@ -457,7 +475,10 @@
}
SimulatorLaunchTaskAction newlaunchTask =
new SimulatorLaunchTaskAction(task, taskAttemptInfo);
-
+ if (loggingEnabled) {
+ LOG.debug("Job " + jobID + " launched taskattempt " +
+ taskAttemptID + " at " + getClock().getTime());
+ }
actions.add(newlaunchTask);
}
}
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorThreadWakeUpEvent.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorThreadWakeUpEvent.java
new file mode 100644
index 0000000..44fa499
--- /dev/null
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorThreadWakeUpEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+public class SimulatorThreadWakeUpEvent extends SimulatorEvent {
+
+ public SimulatorThreadWakeUpEvent(SimulatorEventListener listener,
+ long timestamp) {
+ super(listener, timestamp);
+ }
+
+}
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java
index 60bc2a3..c9a9183 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java
@@ -40,8 +40,7 @@
public static final Log LOG = LogFactory.getLog(MockSimulatorEngine.class);
- public MockSimulatorEngine(int nJobs,
- @SuppressWarnings("unused") int nTrackers) {
+ public MockSimulatorEngine(int nJobs, int nTrackers) {
super();
fixedJobs = nJobs;
jobs = new HashMap<JobID, JobStory>();
@@ -52,7 +51,11 @@
@Override
public void run() throws IOException, InterruptedException {
startTime = System.currentTimeMillis();
- init();
+ JobConf jobConf = createMumakConf();
+ // Adding the default queue since the example trace is from queue-less hadoop
+ jobConf.set("mapred.queue.names",JobConf.DEFAULT_QUEUE_NAME);
+
+ init(jobConf);
validateInitialization();
SimulatorEvent nextEvent;
while ((nextEvent = queue.get()) != null && nextEvent.getTimeStamp() < terminateTime
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
index d6cfb51..88b377a 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
@@ -56,8 +56,10 @@
MockSimulatorEngine mockMumak = new MockSimulatorEngine(numJobs, nTrackers);
+ Configuration mumakConf = new Configuration();
+ mumakConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class.getName());
String[] args = { traceFile.toString(), topologyFile.toString() };
- int res = ToolRunner.run(conf, mockMumak, args);
+ int res = ToolRunner.run(mumakConf, mockMumak, args);
Assert.assertEquals(res, 0);
}
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
index 8db3829..2f48607 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
@@ -57,6 +57,7 @@
jtConf.set("mapred.system.dir", jtConf.get("hadoop.tmp.dir", "/tmp/hadoop-"
+ jtConf.getUser())
+ "/mapred/system");
+ jtConf.set("mapred.queue.names",JobConf.DEFAULT_QUEUE_NAME);
System.out.println("Created JobConf");
return jtConf;
}
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index e5d3bc3..915985d 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -3049,14 +3049,11 @@
new JobInProgress(this, this.conf, restartCount, jobInfo, ts);
synchronized (this) {
- String queue = job.getProfile().getQueueName();
- if (!(queueManager.getLeafQueueNames().contains(queue))) {
- throw new IOException("Queue \"" + queue + "\" does not exist");
- }
-
- // check if queue is RUNNING
- if (!queueManager.isRunning(queue)) {
- throw new IOException("Queue \"" + queue + "\" is not running");
+ try {
+ checkQueueValidity(job);
+ } catch(IOException ioe) {
+ LOG.error("Queue given for job " + job.getJobID() + " is not valid: " + ioe);
+ throw ioe;
}
try {
checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
@@ -3128,6 +3125,24 @@
}
/**
+ * For a JobInProgress that is being submitted, check whether
+ * queue that the job has been submitted to exists and is RUNNING.
+ * @param job The JobInProgress object being submitted.
+ * @throws IOException
+ */
+ public void checkQueueValidity(JobInProgress job) throws IOException {
+ String queue = job.getProfile().getQueueName();
+ if (!(queueManager.getLeafQueueNames().contains(queue))) {
+ throw new IOException("Queue \"" + queue + "\" does not exist");
+ }
+
+ // check if queue is RUNNING
+ if (!queueManager.isRunning(queue)) {
+ throw new IOException("Queue \"" + queue + "\" is not running");
+ }
+ }
+
+ /**
* Check the ACLs for a user doing the passed queue-operation and the passed
* job operation.
* <ul>
@@ -4480,7 +4495,7 @@
* @param job
* @throws IOException
*/
- private void checkMemoryRequirements(JobInProgress job)
+ void checkMemoryRequirements(JobInProgress job)
throws IOException {
if (!perTaskMemoryConfigurationSetOnJT()) {
if (LOG.isDebugEnabled()) {
diff --git a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
index 2c6e546..25e5bc0 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
@@ -745,6 +745,10 @@
// any locality, so this group should count as "distance=2".
// However, setup/cleanup tasks are also counted in the 4th group.
// These tasks do not make sense.
+ if(cdfList==null) {
+ runtime = -1;
+ return runtime;
+ }
try {
runtime = makeUpRuntime(cdfList.get(locality));
} catch (NoValueToMakeUpRuntime e) {
@@ -767,6 +771,9 @@
*/
private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
int total = 0;
+ if(mapAttemptCDFs == null) {
+ return -1;
+ }
for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
total += cdf.getNumberValues();
}
@@ -862,6 +869,11 @@
}
private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
+
+ // if numAttempts == null we are returning FAILED.
+ if(numAttempts == null) {
+ return State.FAILED;
+ }
if (taskAttemptNumber >= numAttempts.length - 1) {
// always succeed
return State.SUCCEEDED;