blob: 9df9aaf69bda0e0058685fc528d0e5af7bbba69b [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestShuffleScheduler {
private TezExecutors sharedExecutor;
@Before
public void setup() {
sharedExecutor = new TezSharedExecutor(new Configuration());
}
@After
public void cleanup() {
sharedExecutor.shutdownNow();
}
@Test (timeout = 10000)
public void testNumParallelScheduledFetchers() throws IOException, InterruptedException {
InputContext inputContext = createTezInputContext();
Configuration conf = new TezConfiguration();
// Allow 10 parallel copies at once.
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 10);
int numInputs = 50;
Shuffle shuffle = mock(Shuffle.class);
MergeManager mergeManager = mock(MergeManager.class);
final ShuffleSchedulerForTest scheduler =
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager,
System.currentTimeMillis(), null, false, 0, "srcName", true);
Future<Void> executorFuture = null;
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
executorFuture = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
scheduler.start();
return null;
}
});
InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
// Schedule all copies.
for (int i = 0; i < numInputs; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
// wait for all the copies to be scheduled with timeout
scheduler.latch.await(2000, TimeUnit.MILLISECONDS);
assertEquals(0, scheduler.latch.getCount());
} finally {
scheduler.close();
if (executorFuture != null) {
executorFuture.cancel(true);
}
executor.shutdownNow();
}
}
@Test(timeout=5000)
public void testUseSharedExecutor() throws Exception {
InputContext inputContext = createTezInputContext();
Configuration conf = new TezConfiguration();
int numInputs = 10;
Shuffle shuffle = mock(Shuffle.class);
MergeManager mergeManager = mock(MergeManager.class);
ShuffleSchedulerForTest scheduler = new ShuffleSchedulerForTest(inputContext, conf, numInputs,
shuffle, mergeManager, mergeManager, System.currentTimeMillis(), null, false, 0, "srcName");
verify(inputContext, times(0)).createTezFrameworkExecutorService(anyInt(), anyString());
scheduler.close();
inputContext = createTezInputContext();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, true);
scheduler = new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager, System.currentTimeMillis(), null, false, 0, "srcName");
verify(inputContext).createTezFrameworkExecutorService(anyInt(), anyString());
scheduler.close();
}
@Test(timeout = 5000)
public void testSimpleFlow() throws Exception {
InputContext inputContext = createTezInputContext();
Configuration conf = new TezConfiguration();
int numInputs = 10;
Shuffle shuffle = mock(Shuffle.class);
MergeManager mergeManager = mock(MergeManager.class);
final ShuffleSchedulerForTest scheduler =
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager,
System.currentTimeMillis(), null, false, 0, "srcName");
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
Future<Void> executorFuture = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
scheduler.start();
return null;
}
});
InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
for (int i = 0; i < numInputs; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
MapHost[] mapHosts = new MapHost[numInputs];
int count = 0;
for (MapHost mh : scheduler.mapLocations.values()) {
mapHosts[count++] = mh;
}
for (int i = 0; i < numInputs; i++) {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
scheduler.freeHost(mapHosts[i]);
}
verify(inputContext, atLeast(numInputs)).notifyProgress();
// Ensure the executor exits, and without an error.
executorFuture.get();
} finally {
scheduler.close();
executor.shutdownNow();
}
}
@Test(timeout = 60000)
/**
* Scenario
* - reducer has not progressed enough
* - reducer becomes unhealthy after some failures
* - no of attempts failing exceeds maxFailedUniqueFetches (5)
* Expected result
* - fail the reducer
*/
public void testReducerHealth_1() throws IOException {
Configuration conf = new TezConfiguration();
_testReducerHealth_1(conf);
conf.setInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 4000);
_testReducerHealth_1(conf);
}
public void _testReducerHealth_1(Configuration conf) throws IOException {
long startTime = System.currentTimeMillis() - 500000;
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320,
shuffle, conf);
int totalProducerNodes = 20;
//Generate 320 events
for (int i = 0; i < 320; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, inputAttemptIdentifier);
}
//100 succeeds
for (int i = 0; i < 100; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
}
//99 fails
for (int i = 100; i < 199; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(200, 0, "attempt_");
//Should fail here and report exception as reducer is not healthy
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (200 % totalProducerNodes), 10000, 200, 1), false, true);
int minFailurePerHost = conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT);
if (minFailurePerHost <= 4) {
//As per test threshold. Should fail & retrigger shuffle
verify(shuffle, atLeast(0)).reportException(any());
} else if (minFailurePerHost > 100) {
//host failure is so high that this would not retrigger shuffle re-execution
verify(shuffle, atLeast(1)).reportException(any());
}
}
@Test(timeout = 60000)
/**
* Scenario
* - reducer has progressed enough
* - failures start happening after that
* - no of attempts failing exceeds maxFailedUniqueFetches (5)
* - Has not stalled
* Expected result
* - Since reducer is not stalled, it should continue without error
*
* When reducer stalls, wait until enough retries are done and throw exception
*
*/
public void testReducerHealth_2() throws IOException, InterruptedException {
long startTime = System.currentTimeMillis() - 500000;
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
int totalProducerNodes = 20;
//Generate 0-200 events
for (int i = 0; i < 200; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, inputAttemptIdentifier);
}
assertEquals(320, scheduler.remainingMaps.get());
//Generate 200-320 events with empty partitions
for (int i = 200; i < 320; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copySucceeded(inputAttemptIdentifier, null, 0, 0, 0, null, true);
}
//120 are successful. so remaining is 200
assertEquals(200, scheduler.remainingMaps.get());
//200 pending to be downloaded. Download 190.
for (int i = 0; i < 190; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
}
assertEquals(10, scheduler.remainingMaps.get());
//10 fails
for (int i = 190; i < 200; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
//Shuffle has not stalled. so no issues.
verify(scheduler.reporter, times(0)).reportException(any());
//stall shuffle
scheduler.lastProgressTime = System.currentTimeMillis() - 250000;
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(190, 0, "attempt_");
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (190 % totalProducerNodes), 10000, 190, 1), false, true);
//Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
verify(scheduler.reporter, times(0)).reportException(any());
assertEquals(11, scheduler.failedShufflesSinceLastCompletion);
//fail to download 50 more times across attempts
for (int i = 190; i < 200; i++) {
inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), false, true);
}
assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
assertEquals(10, scheduler.remainingMaps.get());
verify(shuffle, atLeast(0)).reportException(any());
//fail another 30
for (int i = 110; i < 120; i++) {
inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
InputAttemptFetchFailure failure =
InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
}
// Should fail now due to fetcherHealthy. (stall has already happened and
// these are the only pending tasks)
verify(shuffle, atLeast(1)).reportException(any());
}
@Test(timeout = 60000)
/**
* Scenario
* - reducer has progressed enough
* - failures start happening after that in last fetch
* - no of attempts failing does not exceed maxFailedUniqueFetches (5)
* - Stalled
* Expected result
* - Since reducer is stalled and if failures haven't happened across nodes,
* it should be fine to proceed. AM would restart source task eventually.
*
*/
public void testReducerHealth_3() throws IOException {
long startTime = System.currentTimeMillis() - 500000;
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
int totalProducerNodes = 20;
//Generate 320 events
for (int i = 0; i < 320; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, inputAttemptIdentifier);
}
//319 succeeds
for (int i = 0; i < 319; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
}
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(319, 0, "attempt_");
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
//stall the shuffle
scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 310, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 310, 1),
false, true);
// failedShufflesSinceLastCompletion has crossed the limits. Throw error
verify(shuffle, times(0)).reportException(any());
}
@Test(timeout = 60000)
/**
* Scenario
* - reducer has progressed enough
* - failures have happened randomly in nodes, but tasks are completed
* - failures start happening after that in last fetch
* - no of attempts failing does not exceed maxFailedUniqueFetches (5)
* - Stalled
* Expected result
* - reducer is stalled. But since errors are not seen across multiple
* nodes, it is left to the AM to retart producer. Do not kill consumer.
*
*/
public void testReducerHealth_4() throws IOException {
long startTime = System.currentTimeMillis() - 500000;
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
int totalProducerNodes = 20;
//Generate 320 events
for (int i = 0; i < 320; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, inputAttemptIdentifier);
}
//Tasks fail in 20% of nodes 3 times, but are able to proceed further
for (int i = 0; i < 64; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
InputAttemptFetchFailure failure =
InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
}
//319 succeeds
for (int i = 64; i < 319; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
}
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(319, 0, "attempt_");
scheduler.copyFailed(new InputAttemptFetchFailure(inputAttemptIdentifier),
new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
//stall the shuffle (but within limits)
scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
false, true);
// failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
// well. However, it has failed only in one host. So this should proceed
// until AM decides to restart the producer.
verify(shuffle, times(0)).reportException(any());
//stall the shuffle (but within limits)
scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
verify(shuffle, times(1)).reportException(any());
}
@Test(timeout = 60000)
/**
* Scenario
* - Shuffle has progressed enough
* - Last event is yet to arrive
* - Failures start happening after Shuffle has progressed enough
* - no of attempts failing does not exceed maxFailedUniqueFetches (5)
* - Stalled
* Expected result
* - Do not throw errors, as Shuffle is yet to receive inputs
*
*/
public void testReducerHealth_5() throws IOException {
long startTime = System.currentTimeMillis() - 500000;
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
int totalProducerNodes = 20;
//Generate 319 events (last event has not arrived)
for (int i = 0; i < 319; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, inputAttemptIdentifier);
}
//318 succeeds
for (int i = 0; i < 319; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
}
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(318, 0, "attempt_");
InputAttemptFetchFailure failure = new InputAttemptFetchFailure(inputAttemptIdentifier);
scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes),
10000, 318, 1), false, true);
//stall the shuffle
scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
false, true);
//Shuffle has not received the events completely. So do not bail out yet.
verify(shuffle, times(0)).reportException(any());
}
@Test(timeout = 60000)
/**
* Scenario
* - Shuffle has NOT progressed enough
* - Failures start happening
* - no of attempts failing exceed maxFailedUniqueFetches (5)
* - Not stalled
* Expected result
* - Bail out
*
*/
public void testReducerHealth_6() throws IOException {
Configuration conf = new TezConfiguration();
conf.setBoolean
(TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true);
_testReducerHealth_6(conf);
conf.setBoolean
(TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false);
_testReducerHealth_6(conf);
}
public void _testReducerHealth_6(Configuration conf) throws IOException {
long startTime = System.currentTimeMillis() - 500000;
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320,
shuffle, conf);
int totalProducerNodes = 20;
//Generate 320 events (last event has not arrived)
for (int i = 0; i < 320; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, inputAttemptIdentifier);
}
//10 succeeds
for (int i = 0; i < 10; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
}
//5 fetches fail once
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
assertTrue(scheduler.failureCounts.size() >= 5);
assertEquals(scheduler.remainingMaps.get(), 310);
//Do not bail out (number of failures is just 5)
verify(scheduler.reporter, times(0)).reportException(any());
//5 fetches fail repeatedly
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
(TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION,
TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
if (checkFailedFetchSinceLastCompletion) {
// Now bail out, as Shuffle has crossed the
// failedShufflesSinceLastCompletion limits. (even
// though reducerHeathly is
verify(shuffle, atLeast(1)).reportException(any());
} else {
//Do not bail out yet.
verify(shuffle, atLeast(0)).reportException(any());
}
}
@Test(timeout = 60000)
/**
* Scenario
* - reducer has not progressed enough
* - fetch fails >
* TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION
* Expected result
* - fail the reducer
*/
public void testReducerHealth_7() throws IOException {
long startTime = System.currentTimeMillis() - 500000;
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
int totalProducerNodes = 20;
//Generate 320 events
for (int i = 0; i < 320; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
inputAttemptIdentifier);
}
//100 succeeds
for (int i = 0; i < 100; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier,
new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
100, 200, startTime + (i * 100), mapOutput, false);
}
//99 fails
for (int i = 100; i < 199; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
false, true);
}
verify(shuffle, atLeast(1)).reportException(any());
}
private ShuffleSchedulerForTest createScheduler(long startTime, int
numInputs, Shuffle shuffle, Configuration conf)
throws IOException {
InputContext inputContext = createTezInputContext();
MergeManager mergeManager = mock(MergeManager.class);
final ShuffleSchedulerForTest scheduler =
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager,startTime, null, false, 0, "srcName");
return scheduler;
}
private ShuffleSchedulerForTest createScheduler(long startTime, int numInputs, Shuffle shuffle)
throws IOException {
return createScheduler(startTime, numInputs, shuffle, new
TezConfiguration());
}
@Test(timeout = 60000)
public void testPenalty() throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle);
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(0, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host0", 10000, 0, inputAttemptIdentifier);
assertTrue(scheduler.pendingHosts.size() == 1);
assertTrue(scheduler.pendingHosts.iterator().next().getState() == MapHost.State.PENDING);
MapHost mapHost = scheduler.pendingHosts.iterator().next();
//Fails to pull from host0. host0 should be added to penalties
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier), mapHost,
false, true);
//Should not get host, as it is added to penalty loop
MapHost host = scheduler.getHost();
assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
//Refree thread would release it after INITIAL_PENALTY timeout
Thread.sleep(ShuffleScheduler.INITIAL_PENALTY + 1000);
host = scheduler.getHost();
assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
}
@Test (timeout = 20000)
public void testProgressDuringGetHostWait() throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
Configuration conf = new TezConfiguration();
Shuffle shuffle = mock(Shuffle.class);
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle, conf);
Thread schedulerGetHostThread = new Thread(new Runnable() {
@Override
public void run() {
try {
scheduler.getHost();
} catch (Exception e) {
e.printStackTrace();
}
}
});
schedulerGetHostThread.start();
Thread.currentThread().sleep(1000 * 3 + 1000);
schedulerGetHostThread.interrupt();
verify(scheduler.inputContext, atLeast(3)).notifyProgress();
}
@Test(timeout = 5000)
public void testShutdown() throws Exception {
InputContext inputContext = createTezInputContext();
Configuration conf = new TezConfiguration();
int numInputs = 10;
Shuffle shuffle = mock(Shuffle.class);
MergeManager mergeManager = mock(MergeManager.class);
final ShuffleSchedulerForTest scheduler =
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager,
System.currentTimeMillis(), null, false, 0, "srcName");
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
Future<Void> executorFuture = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
scheduler.start();
return null;
}
});
InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
for (int i = 0; i < numInputs; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
MapHost[] mapHosts = new MapHost[numInputs];
int count = 0;
for (MapHost mh : scheduler.mapLocations.values()) {
mapHosts[count++] = mh;
}
// Copy succeeded for 1 less host
for (int i = 0; i < numInputs - 1; i++) {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
scheduler.freeHost(mapHosts[i]);
}
scheduler.close();
// Ensure the executor exits, and without an error.
executorFuture.get();
} finally {
scheduler.close();
executor.shutdownNow();
}
}
@Test(timeout = 30000)
public void testShutdownWithInterrupt() throws Exception {
InputContext inputContext = createTezInputContext();
Configuration conf = new TezConfiguration();
int numInputs = 10;
Shuffle shuffle = mock(Shuffle.class);
MergeManager mergeManager = mock(MergeManager.class);
final ShuffleSchedulerForTest scheduler =
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager,
System.currentTimeMillis(), null, false, 0, "srcName");
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Void> executorFuture = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
scheduler.start();
return null;
}
});
InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
for (int i = 0; i < numInputs; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
MapHost[] mapHosts = new MapHost[numInputs];
int count = 0;
for (MapHost mh : scheduler.mapLocations.values()) {
mapHosts[count++] = mh;
}
// Copy succeeded for 1 less host
for (int i = 0; i < numInputs - 1; i++) {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
scheduler.freeHost(mapHosts[i]);
}
try {
// Close the scheduler on different thread to trigger interrupt
Thread thread = new Thread(new Runnable() {
@Override public void run() {
scheduler.close();
}
});
thread.start();
thread.join();
} finally {
assertTrue("Fetcher executor should be shutdown, but still running",
scheduler.hasFetcherExecutorStopped());
executor.shutdownNow();
}
}
@Test (timeout = 120000)
public void testPenalties() throws Exception {
InputContext inputContext = createTezInputContext();
Configuration conf = new TezConfiguration();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS, 20000);
int numInputs = 10;
Shuffle shuffle = mock(Shuffle.class);
MergeManager mergeManager = mock(MergeManager.class);
final ShuffleSchedulerForTest scheduler =
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager,
System.currentTimeMillis(), null, false, 0, "srcName");
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Void> executorFuture = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
scheduler.start();
return null;
}
});
InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
for (int i = 0; i < numInputs; i++) {
CompositeInputAttemptIdentifier inputAttemptIdentifier =
new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
MapHost[] mapHosts = new MapHost[numInputs];
int count = 0;
for (MapHost mh : scheduler.mapLocations.values()) {
mapHosts[count++] = mh;
}
for (int i = 0; i < 10; i++) {
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(identifiers[0]), mapHosts[0], false,
false);
}
ShuffleScheduler.Penalty[] penaltyArray = new ShuffleScheduler.Penalty[scheduler.getPenalties().size()];
scheduler.getPenalties().toArray(penaltyArray);
for (int i = 0; i < penaltyArray.length; i++) {
Assert.assertTrue(penaltyArray[i].getDelay(TimeUnit.MILLISECONDS) <= 20000);
}
}
private InputContext createTezInputContext() throws IOException {
ApplicationId applicationId = ApplicationId.newInstance(1, 1);
InputContext inputContext = mock(InputContext.class);
doReturn(applicationId).when(inputContext).getApplicationId();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
when(inputContext.getCounters()).thenReturn(new TezCounters());
ExecutionContext executionContext = new ExecutionContextImpl("localhost");
doReturn(executionContext).when(inputContext).getExecutionContext();
ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
Token<JobTokenIdentifier>
sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")),
new JobTokenSecretManager());
ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
new Answer<ExecutorService>() {
@Override
public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
return sharedExecutor.createExecutorService(
invocation.getArgument(0, Integer.class),
invocation.getArgument(1, String.class));
}
});
return inputContext;
}
private static class ShuffleSchedulerForTest extends ShuffleScheduler {
private CountDownLatch latch = new CountDownLatch(10);
private final boolean fetcherShouldWait;
private final ExceptionReporter reporter;
private final InputContext inputContext;
public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf,
int numberOfInputs,
Shuffle shuffle,
MergeManager mergeManager,
FetchedInputAllocatorOrderedGrouped allocator, long startTime,
CompressionCodec codec,
boolean ifileReadAhead, int ifileReadAheadLength,
String srcNameTrimmed) throws IOException {
this(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec,
ifileReadAhead, ifileReadAheadLength, srcNameTrimmed, false);
}
public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf,
int numberOfInputs,
Shuffle shuffle,
MergeManager mergeManager,
FetchedInputAllocatorOrderedGrouped allocator, long startTime,
CompressionCodec codec,
boolean ifileReadAhead, int ifileReadAheadLength,
String srcNameTrimmed, boolean fetcherShouldWait) throws IOException {
super(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec,
ifileReadAhead, ifileReadAheadLength, srcNameTrimmed);
this.fetcherShouldWait = fetcherShouldWait;
this.reporter = shuffle;
this.inputContext = inputContext;
}
@Override
FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
latch.countDown();
FetcherOrderedGrouped mockFetcher = mock(FetcherOrderedGrouped.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (fetcherShouldWait) {
Thread.sleep(100000l);
}
return null;
}
}).when(mockFetcher).callInternal();
return mockFetcher;
}
}
}