blob: b9a7c5ae1768d8ca77c1ef8a5d88effc2a65fa4e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;
import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacyTaskRuntimeEstimator;
import org.apache.tez.dag.app.dag.speculation.legacy.SimpleExponentialTaskRuntimeEstimator;
import org.apache.tez.dag.app.dag.speculation.legacy.TaskRuntimeEstimator;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.base.Joiner;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test speculation behavior given the list of estimator classes.
*/
@RunWith(Parameterized.class)
public class TestSpeculation {
private final static Logger LOG = LoggerFactory.getLogger(TezConfiguration.class);
private static final String ASSERT_SPECULATIONS_COUNT_MSG =
"Number of attempts after Speculation should be two";
private static final String UNIT_EXCEPTION_MESSAGE =
"test timed out after";
private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3;
private Configuration defaultConf;
private FileSystem localFs;
/**
* The Mock app.
*/
MockDAGAppMaster mockApp;
/**
* The Mock launcher.
*/
MockContainerLauncher mockLauncher;
/**
* The interface Retry.
*/
@Retention(RetentionPolicy.RUNTIME)
public @interface Retry {}
/**
* The type Retry rule.
*/
class RetryRule implements TestRule {
private AtomicInteger retryCount;
/**
* Instantiates a new Retry rule.
*
* @param retries the retries
*/
RetryRule(int retries) {
super();
this.retryCount = new AtomicInteger(retries);
}
@Override
public Statement apply(final Statement base,
final Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
Throwable caughtThrowable = null;
while (retryCount.getAndDecrement() > 0) {
try {
base.evaluate();
return;
} catch (Throwable t) {
caughtThrowable = t;
if (retryCount.get() > 0 &&
description.getAnnotation(Retry.class) != null) {
if (!((t instanceof AssertionError && t.getMessage()
.contains(ASSERT_SPECULATIONS_COUNT_MSG))
|| (t instanceof Exception && t.getMessage()
.contains(UNIT_EXCEPTION_MESSAGE)))) {
throw caughtThrowable;
}
LOG.warn("{} : Failed. Retries remaining: ",
description.getDisplayName(),
retryCount.toString());
} else {
throw caughtThrowable;
}
}
}
}
};
}
}
/**
* The Rule.
*/
@Rule
public RetryRule rule = new RetryRule(ASSERT_SPECULATIONS_COUNT_RETRIES);
/**
* Sets default conf.
*/
@Before
public void setDefaultConf() {
try {
defaultConf = new Configuration(false);
defaultConf.set("fs.defaultFS", "file:///");
defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
defaultConf.setFloat(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 1);
defaultConf.setFloat(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 1);
localFs = FileSystem.getLocal(defaultConf);
String stagingDir =
"target" + Path.SEPARATOR + TestSpeculation.class.getName()
+ "-tmpDir";
defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
defaultConf.setClass(TezConfiguration.TEZ_AM_TASK_ESTIMATOR_CLASS,
estimatorClass,
TaskRuntimeEstimator.class);
defaultConf.setInt(TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS, 20);
defaultConf.setDouble(TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE, 0.2);
defaultConf.setDouble(TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE, 0.25);
defaultConf.setLong(TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE, 25);
defaultConf.setLong(TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE, 50);
defaultConf.setInt(TezConfiguration.TEZ_AM_ESTIMATOR_EXPONENTIAL_SKIP_INITIALS, 2);
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
/**
* Tear down.
*/
@After
public void tearDown() {
defaultConf = null;
try {
localFs.close();
mockLauncher.shutdown();
mockApp.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Gets test parameters.
*
* @return the test parameters
*/
@Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{
{SimpleExponentialTaskRuntimeEstimator.class},
{LegacyTaskRuntimeEstimator.class}
});
}
private Class<? extends TaskRuntimeEstimator> estimatorClass;
/**
* Instantiates a new Test speculation.
*
* @param estimatorKlass the estimator klass
*/
public TestSpeculation(Class<? extends TaskRuntimeEstimator> estimatorKlass) {
this.estimatorClass = estimatorKlass;
}
/**
* Create tez session mock tez client.
*
* @return the mock tez client
* @throws Exception the exception
*/
MockTezClient createTezSession() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, true, null, null,
new MockClock(), mockAppLauncherGoFlag, false, false, 1, 2);
tezClient.start();
syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
return tezClient;
}
/**
* Sync with mock app launcher.
*
* @param allowScheduling the allow scheduling
* @param mockAppLauncherGoFlag the mock app launcher go flag
* @param tezClient the tez client
* @throws Exception the exception
*/
void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag,
MockTezClient tezClient) throws Exception {
synchronized (mockAppLauncherGoFlag) {
while (!mockAppLauncherGoFlag.get()) {
mockAppLauncherGoFlag.wait();
}
mockApp = tezClient.getLocalClient().getMockApp();
mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(allowScheduling);
mockAppLauncherGoFlag.notify();
}
}
/**
* Test single task speculation.
*
* @throws Exception the exception
*/
@Retry
@Test (timeout = 10000)
public void testSingleTaskSpeculation() throws Exception {
// Map<Timeout conf value, expected number of tasks>
Map<Long, Integer> confToExpected = new HashMap<Long, Integer>();
confToExpected.put(Long.MAX_VALUE >> 1, 1); // Really long time to speculate
confToExpected.put(100L, 2);
confToExpected.put(-1L, 1); // Don't speculate
defaultConf.setLong(TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE, 50);
for(Map.Entry<Long, Integer> entry : confToExpected.entrySet()) {
defaultConf.setLong(
TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT,
entry.getKey());
DAG dag = DAG.create("test");
Vertex vA = Vertex.create("A",
ProcessorDescriptor.create("Proc.class"),
1);
dag.addVertex(vA);
MockTezClient tezClient = createTezSession();
DAGClient dagClient = tezClient.submitDAG(dag);
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
// original attempt is killed and speculative one is successful
TezTaskAttemptID killedTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
TezTaskAttemptID successTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
Thread.sleep(200);
// cause speculation trigger
mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
Task task = dagImpl.getTask(killedTaId.getTaskID());
Assert.assertEquals(entry.getValue().intValue(), task.getAttempts().size());
if (entry.getValue() > 1) {
Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
TaskAttempt killedAttempt = task.getAttempt(killedTaId);
Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION,
killedAttempt.getTerminationCause());
}
tezClient.stop();
}
}
/**
* Test basic speculation.
*
* @param withProgress the with progress
* @throws Exception the exception
*/
public void testBasicSpeculation(boolean withProgress) throws Exception {
DAG dag = DAG.create("test");
Vertex vA = Vertex.create("A",
ProcessorDescriptor.create("Proc.class"), 5);
dag.addVertex(vA);
MockTezClient tezClient = createTezSession();
DAGClient dagClient = tezClient.submitDAG(dag);
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
// original attempt is killed and speculative one is successful
TezTaskAttemptID killedTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
TezTaskAttemptID successTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
mockLauncher.updateProgress(withProgress);
// cause speculation trigger
mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED,
dagClient.getDAGStatus(null).getState());
Task task = dagImpl.getTask(killedTaId.getTaskID());
Assert.assertEquals(ASSERT_SPECULATIONS_COUNT_MSG, 2,
task.getAttempts().size());
Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
TaskAttempt killedAttempt = task.getAttempt(killedTaId);
Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION,
killedAttempt.getTerminationCause());
if (withProgress) {
// without progress updates occasionally more than 1 task speculates
Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
}
LegacySpeculator speculator =
(LegacySpeculator)(dagImpl.getVertex(vA.getName())).getSpeculator();
Assert.assertEquals(20, speculator.getMinimumAllowedSpeculativeTasks());
Assert.assertEquals(.2, speculator.getProportionTotalTasksSpeculatable(), 0);
Assert.assertEquals(.25, speculator.getProportionRunningTasksSpeculatable(), 0);
Assert.assertEquals(25, speculator.getSoonestRetryAfterNoSpeculate());
Assert.assertEquals(50, speculator.getSoonestRetryAfterSpeculate());
tezClient.stop();
}
/**
* Test basic speculation with progress.
*
* @throws Exception the exception
*/
@Retry
@Test (timeout=10000)
public void testBasicSpeculationWithProgress() throws Exception {
testBasicSpeculation(true);
}
/**
* Test basic speculation without progress.
*
* @throws Exception the exception
*/
@Retry
@Test (timeout=10000)
public void testBasicSpeculationWithoutProgress() throws Exception {
testBasicSpeculation(false);
}
/**
* Test basic speculation per vertex conf.
*
* @throws Exception the exception
*/
@Retry
@Test (timeout=10000)
public void testBasicSpeculationPerVertexConf() throws Exception {
DAG dag = DAG.create("test");
String vNameNoSpec = "A";
String vNameSpec = "B";
String speculatorSleepTime = "50";
Vertex vA = Vertex.create(vNameNoSpec, ProcessorDescriptor.create("Proc.class"), 5);
Vertex vB = Vertex.create(vNameSpec, ProcessorDescriptor.create("Proc.class"), 5);
vA.setConf(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, "false");
dag.addVertex(vA);
dag.addVertex(vB);
// min/max src fraction is set to 1. So vertices will run sequentially
dag.addEdge(
Edge.create(vA, vB,
EdgeProperty.create(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, OutputDescriptor.create("O"),
InputDescriptor.create("I"))));
MockTezClient tezClient = createTezSession();
DAGClient dagClient = tezClient.submitDAG(dag);
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
TezVertexID vertexId = dagImpl.getVertex(vNameSpec).getVertexId();
TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId();
// original attempt is killed and speculative one is successful
TezTaskAttemptID killedTaId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
TezTaskAttemptID noSpecTaId = TezTaskAttemptID
.getInstance(TezTaskID.getInstance(vertexIdNoSpec, 0), 0);
// cause speculation trigger for both
mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
mockLauncher.setStatusUpdatesForTask(noSpecTaId, 100);
mockLauncher.startScheduling(true);
org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexId);
org.apache.tez.dag.app.dag.Vertex vNoSpec = dagImpl.getVertex(vertexIdNoSpec);
// Wait enough time to give chance for the speculator to trigger
// speculation on VB.
// This would fail because of JUnit time out.
do {
Thread.sleep(100);
} while (vSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue() <= 0);
dagClient.waitForCompletion();
// speculation for vA but not for vB
Assert.assertTrue("Num Speculations is not higher than 0",
vSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue() > 0);
Assert.assertEquals(0,
vNoSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
tezClient.stop();
}
/**
* Test basic speculation not useful.
*
* @throws Exception the exception
*/
@Retry
@Test (timeout=10000)
public void testBasicSpeculationNotUseful() throws Exception {
DAG dag = DAG.create("test");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
dag.addVertex(vA);
MockTezClient tezClient = createTezSession();
DAGClient dagClient = tezClient.submitDAG(dag);
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
// original attempt is successful and speculative one is killed
TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
mockLauncher.setStatusUpdatesForTask(successTaId, 100);
mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
Task task = dagImpl.getTask(killedTaId.getTaskID());
Assert.assertEquals(2, task.getAttempts().size());
Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
TaskAttempt killedAttempt = task.getAttempt(killedTaId);
Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION,
killedAttempt.getTerminationCause());
Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
tezClient.stop();
}
}