blob: 8cc2e8b1a28115ad37f1c689ae91d13ab467af0a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TestPreemption {
static Configuration defaultConf;
static FileSystem localFs;
static Path workDir;
static {
try {
defaultConf = new Configuration(false);
defaultConf.set("fs.defaultFS", "file:///");
defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
localFs = FileSystem.getLocal(defaultConf);
workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
"TestDAGAppMaster").makeQualified(localFs);
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
MockDAGAppMaster mockApp;
MockContainerLauncher mockLauncher;
int dagCount = 0;
DAG createDAG(DataMovementType dmType) {
DAG dag = DAG.create("test-" + dagCount++);
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 5);
Edge eAB = Edge.create(vA, vB,
EdgeProperty.create(dmType, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, OutputDescriptor.create("O.class"),
InputDescriptor.create("I.class")));
dag.addVertex(vA).addVertex(vB).addEdge(eAB);
return dag;
}
@Test (timeout = 5000)
public void testPreemptionWithoutSession() throws Exception {
System.out.println("TestPreemptionWithoutSession");
TezConfiguration tezconf = new TezConfiguration(defaultConf);
tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null,
null, mockAppLauncherGoFlag);
tezClient.start();
DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER));
// now the MockApp has been started. sync with it to get the launcher
syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
DAGImpl dagImpl;
do {
Thread.sleep(100); // usually needs to sleep 2-3 times
} while ((dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG()) == null);
int vertexIndex = 0;
int upToTaskVersion = 3;
TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex);
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
for (int i=0; i<=upToTaskVersion; ++i) {
TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);
TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
}
tezClient.stop();
}
@Test (timeout = 30000)
public void testPreemptionWithSession() throws Exception {
System.out.println("TestPreemptionWithSession");
MockTezClient tezClient = createTezSession();
testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast");
testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast");
testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1");
testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1");
testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast");
testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast");
testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1");
testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1");
tezClient.stop();
}
MockTezClient createTezSession() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null,
null, mockAppLauncherGoFlag);
tezClient.start();
syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
return tezClient;
}
void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag,
MockTezClient tezClient) throws Exception {
synchronized (mockAppLauncherGoFlag) {
while (!mockAppLauncherGoFlag.get()) {
mockAppLauncherGoFlag.wait();
}
mockApp = tezClient.getLocalClient().getMockApp();
mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(allowScheduling);
mockAppLauncherGoFlag.notify();
}
}
void testPreemptionSingle(MockTezClient tezClient, DAG dag, int vertexIndex, String info)
throws Exception {
testPreemptionJob(tezClient, dag, vertexIndex, 0, info + "-Single");
}
void testPreemptionMultiple(MockTezClient tezClient, DAG dag, int vertexIndex, String info)
throws Exception {
testPreemptionJob(tezClient, dag, vertexIndex, 3, info + "-Multiple");
}
void testPreemptionJob(MockTezClient tezClient, DAG dag, int vertexIndex,
int upToTaskVersion, String info) throws Exception {
System.out.println("TestPreemption - Running - " + info);
TezConfiguration tezconf = new TezConfiguration(defaultConf);
tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
mockLauncher.startScheduling(false); // turn off scheduling to block DAG before submitting it
DAGClient dagClient = tezClient.submitDAG(dag);
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex);
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
for (int i=0; i<=upToTaskVersion; ++i) {
TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);
TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, taImpl.getTerminationCause());
}
System.out.println("TestPreemption - Done running - " + info);
}
}