blob: 05d4eb41457286fc5960f1164913803da17089b6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.shuffle;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.testutils.RuntimeTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestFetcher {
private static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
private static String HOST = "localhost";
private static int PORT = 41;
@Test(timeout = 3000)
public void testLocalFetchModeSetting() throws Exception {
TezConfiguration conf = new TezConfiguration();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
InputAttemptIdentifier[] srcAttempts = {
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1")
};
FetcherCallback fetcherCallback = mock(FetcherCallback.class);
final boolean ENABLE_LOCAL_FETCH = true;
final boolean DISABLE_LOCAL_FETCH = false;
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
PORT, false, true, false);
builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
FetchResult fr = new FetchResult(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
Fetcher.HostFetchResult hfr =
new Fetcher.HostFetchResult(fr, InputAttemptFetchFailure.fromAttempts(srcAttempts), false);
doReturn(hfr).when(fetcher).setupLocalDiskFetch();
doReturn(null).when(fetcher).doHttpFetch();
doNothing().when(fetcher).shutdown();
fetcher.call();
verify(fetcher).setupLocalDiskFetch();
verify(fetcher, never()).doHttpFetch();
// when enabled and hostname does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
PORT, false, true, false);
builder.assignWork(HOST + "_OTHER", PORT, 0, 1, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
doReturn(null).when(fetcher).setupLocalDiskFetch();
doReturn(hfr).when(fetcher).doHttpFetch();
doNothing().when(fetcher).shutdown();
fetcher.call();
verify(fetcher, never()).setupLocalDiskFetch();
verify(fetcher).doHttpFetch();
// when enabled and port does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
PORT, false, true, false);
builder.assignWork(HOST, PORT + 1, 0, 1, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
doReturn(null).when(fetcher).setupLocalDiskFetch();
doReturn(hfr).when(fetcher).doHttpFetch();
doNothing().when(fetcher).shutdown();
fetcher.call();
verify(fetcher, never()).setupLocalDiskFetch();
verify(fetcher).doHttpFetch();
// When disabled use http fetch
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
PORT, false, true, false);
builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
doReturn(null).when(fetcher).setupLocalDiskFetch();
doReturn(hfr).when(fetcher).doHttpFetch();
doNothing().when(fetcher).shutdown();
fetcher.call();
verify(fetcher, never()).setupLocalDiskFetch();
verify(fetcher).doHttpFetch();
}
@Test(timeout = 3000)
public void testSetupLocalDiskFetch() throws Exception {
CompositeInputAttemptIdentifier[] srcAttempts = {
new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1),
new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1),
new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1),
new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1),
new CompositeInputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1)
};
final int FIRST_FAILED_ATTEMPT_IDX = 2;
final int SECOND_FAILED_ATTEMPT_IDX = 4;
final int[] successfulAttempts = {0, 1, 3};
TezConfiguration conf = new TezConfiguration();
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
false, true, true);
ArrayList<InputAttemptIdentifier> inputAttemptIdentifiers = new ArrayList<>();
for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) {
for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) {
inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i));
}
}
ArrayList<InputAttemptIdentifier> list = new ArrayList<InputAttemptIdentifier>();
list.addAll(Arrays.asList(srcAttempts));
builder.assignWork(HOST, PORT, partition, 1, list);
Fetcher fetcher = spy(builder.build());
for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) {
for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) {
inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i));
Fetcher.PathPartition pathPartition =
new Fetcher.PathPartition(compositeInputAttemptIdentifier.getPathComponent(),partition + i);
fetcher.getPathToAttemptMap().put(pathPartition, compositeInputAttemptIdentifier.expand(i));
}
}
doAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
}
}).when(fetcher).getShuffleInputFileName(anyString(), anyString());
doAnswer(new Answer<TezIndexRecord>() {
@Override
public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
InputAttemptIdentifier srcAttemptId = (InputAttemptIdentifier) args[0];
String pathComponent = srcAttemptId.getPathComponent();
int len = pathComponent.length();
long p = Long.valueOf(pathComponent.substring(len - 1, len));
// Fail the 3rd one and 5th one.
if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) {
throw new IOException("failing on 3/5th input to simulate failure case");
}
// match with params for copySucceeded below.
return new TezIndexRecord(p * 10, p * 1000, p * 100);
}
}).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class), anyInt());
doNothing().when(fetcher).shutdown();
doNothing().when(callback).fetchSucceeded(anyString(), any(InputAttemptIdentifier.class),
any(FetchedInput.class), anyLong(), anyLong(), anyLong());
doNothing().when(callback).fetchFailed(anyString(), any(InputAttemptFetchFailure.class), eq(false));
FetchResult fetchResult = fetcher.call();
verify(fetcher).setupLocalDiskFetch();
// expect 3 successes and 2 failures
for (int i : successfulAttempts) {
verifyFetchSucceeded(callback, srcAttempts[i], conf);
}
verify(callback).fetchFailed(eq(HOST),
eq(InputAttemptFetchFailure
.fromCompositeAttemptLocalFetchFailure(srcAttempts[FIRST_FAILED_ATTEMPT_IDX])),
eq(false));
verify(callback).fetchFailed(eq(HOST),
eq(InputAttemptFetchFailure
.fromCompositeAttemptLocalFetchFailure(srcAttempts[SECOND_FAILED_ATTEMPT_IDX])),
eq(false));
Assert.assertEquals("fetchResult host", fetchResult.getHost(), HOST);
Assert.assertEquals("fetchResult partition", fetchResult.getPartition(), partition);
Assert.assertEquals("fetchResult port", fetchResult.getPort(), PORT);
// 3nd and 5th attempt failed
List<InputAttemptIdentifier> pendingInputs = Lists.newArrayList(fetchResult.getPendingInputs());
Assert.assertEquals("fetchResult pendingInput size", pendingInputs.size(), 2);
Assert.assertEquals("fetchResult failed attempt", pendingInputs.get(0),
srcAttempts[FIRST_FAILED_ATTEMPT_IDX]);
Assert.assertEquals("fetchResult failed attempt", pendingInputs.get(1),
srcAttempts[SECOND_FAILED_ATTEMPT_IDX]);
}
protected void verifyFetchSucceeded(FetcherCallback callback, CompositeInputAttemptIdentifier srcAttempId, Configuration conf) throws IOException {
String pathComponent = srcAttempId.getPathComponent();
int len = pathComponent.length();
long p = Long.valueOf(pathComponent.substring(len - 1, len));
ArgumentCaptor<LocalDiskFetchedInput> capturedFetchedInput =
ArgumentCaptor.forClass(LocalDiskFetchedInput.class);
verify(callback)
.fetchSucceeded(eq(HOST), eq(srcAttempId.expand(0)), capturedFetchedInput.capture(), eq(p * 100),
eq(p * 1000), anyLong());
LocalDiskFetchedInput f = capturedFetchedInput.getValue();
Assert.assertEquals("success callback filename", f.getInputFile().toString(),
SHUFFLE_INPUT_FILE_PREFIX + pathComponent);
Assert.assertTrue("success callback fs", f.getLocalFS() instanceof LocalFileSystem);
Assert.assertEquals("success callback filesystem", f.getStartOffset(), p * 10);
Assert.assertEquals("success callback compressed size", f.getSize(), p * 100);
Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId.expand(0));
Assert.assertEquals("success callback type", f.getType(), FetchedInput.Type.DISK_DIRECT);
}
@Test(timeout=5000)
public void testInputAttemptIdentifierMap() {
InputAttemptIdentifier[] srcAttempts = {
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
//duplicate entry
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
// pipeline shuffle based identifiers, with multiple attempts
new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
};
InputAttemptIdentifier[] expectedSrcAttempts = {
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
// pipeline shuffle based identifiers
new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
};
TezConfiguration conf = new TezConfiguration();
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
false, true, false);
builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
Assert.assertTrue(expectedSrcAttempts.length == fetcher.srcAttemptsRemaining.size());
Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.srcAttemptsRemaining.entrySet().iterator();
int count = 0;
while(iterator.hasNext()) {
String key = iterator.next().getKey();
Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0);
}
}
@Test
public void testShuffleHandlerDiskErrorUnordered()
throws Exception {
Configuration conf = new Configuration();
InputContext inputContext = mock(InputContext.class);
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("vertex").when(inputContext).getSourceVertexName();
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(mock(ShuffleManager.class), null,
null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
false, true, false);
builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(new InputAttemptIdentifier(0, 0)));
Fetcher fetcher = builder.build();
ShuffleHeader header =
new ShuffleHeader(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString(), -1, -1, -1);
DataInputStream input = RuntimeTestUtils.shuffleHeaderToDataInput(header);
InputAttemptFetchFailure[] failures =
fetcher.fetchInputs(input, null, new InputAttemptIdentifier(0, 0));
Assert.assertEquals(1, failures.length);
Assert.assertTrue(failures[0].isDiskErrorAtSource());
Assert.assertFalse(failures[0].isLocalFetch());
}
}