TEZ-4233: Map task should be blamed earlier for local fetch failures (Laszlo Bdoor reviewed by Rajesh Balamohan)
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
index cabc39f..8ef50eb 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
@@ -51,26 +51,44 @@
*/
private final int numFailures;
- private InputReadErrorEvent(final String diagnostics, final int index,
- final int version, final int numFailures) {
+ /**
+ * Whether this input read error is caused while fetching local file.
+ */
+ private final boolean isLocalFetch;
+
+ /**
+ * Whether this input read error is caused because the fetcher detected a fatal, unrecoverable,
+ * local file read issue from the shuffle handler.
+ */
+ private final boolean isDiskErrorAtSource;
+
+ private InputReadErrorEvent(final String diagnostics, final int index, final int version,
+ final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) {
super();
this.diagnostics = diagnostics;
this.index = index;
this.version = version;
this.numFailures = numFailures;
+ this.isLocalFetch = isLocalFetch;
+ this.isDiskErrorAtSource = isDiskErrorAtSource;
}
- public static InputReadErrorEvent create(String diagnostics, int index,
- int version) {
- return create(diagnostics, index, version, 1);
+ public static InputReadErrorEvent create(String diagnostics, int index, int version,
+ boolean isLocalFetch, boolean isDiskErrorAtSource) {
+ return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource);
+ }
+
+ public static InputReadErrorEvent create(String diagnostics, int index, int version) {
+ return create(diagnostics, index, version, 1, false, false);
}
/**
* Create an InputReadErrorEvent.
*/
public static InputReadErrorEvent create(final String diagnostics, final int index,
- final int version, final int numFailures) {
- return new InputReadErrorEvent(diagnostics, index, version, numFailures);
+ final int version, final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) {
+ return new InputReadErrorEvent(diagnostics, index, version, numFailures, isLocalFetch,
+ isDiskErrorAtSource);
}
public String getDiagnostics() {
@@ -92,6 +110,14 @@
return numFailures;
}
+ public boolean isLocalFetch() {
+ return isLocalFetch;
+ }
+
+ public boolean isDiskErrorAtSource() {
+ return isDiskErrorAtSource;
+ }
+
@Override
public int hashCode() {
return Objects.hash(index, version);
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/ShuffleHandlerError.java b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/ShuffleHandlerError.java
new file mode 100644
index 0000000..09137de
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/ShuffleHandlerError.java
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.runtime.library.common.shuffle.api;
+
+/**
+ * ShuffleHandlerError enum encapsulates possible error messages that can be propagated from
+ * ShuffleHandler to fetchers. Depending on the message, fetchers can make better decisions, or give
+ * AM a hint in order to let it make better decisions in case of shuffle issues.
+ */
+public enum ShuffleHandlerError {
+ DISK_ERROR_EXCEPTION
+}
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/package-info.java b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/package-info.java
new file mode 100644
index 0000000..9ad8e61
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.runtime.library.common.shuffle.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index 7123500..e041c33 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -39,6 +39,8 @@
optional int32 index = 1;
optional string diagnostics = 2;
optional int32 version = 3;
+ optional bool is_local_fetch = 4;
+ optional bool is_disk_error_at_source = 5;
}
message InputFailedEventProto {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 37e818e..9a5e73d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1825,6 +1825,7 @@
+ " at inputIndex " + failedInputIndexOnDestTa);
long time = attempt.clock.getTime();
Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId);
+
if (firstErrReportTime == null) {
attempt.uniquefailedOutputReports.put(failedDestTaId, time);
firstErrReportTime = time;
@@ -1851,7 +1852,8 @@
// If needed we can launch a background task without failing this task
// to generate a copy of the output just in case.
// If needed we can consider only running consumer tasks
- if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits) {
+ if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits
+ && !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource())) {
return attempt.getInternalState();
}
String message = attempt.getID() + " being failed for too many output errors. "
@@ -1862,7 +1864,10 @@
+ ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures
+ ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC="
+ maxAllowedTimeForTaskReadErrorSec
- + ", readErrorTimespan=" + readErrorTimespanSec;
+ + ", readErrorTimespan=" + readErrorTimespanSec
+ + ", isLocalFetch=" + readErrorEvent.isLocalFetch()
+ + ", isDiskErrorAtSource=" + readErrorEvent.isDiskErrorAtSource();
+
LOG.info(message);
attempt.addDiagnosticInfo(message);
// send input failed event
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 41cce3b..6862bec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -19,7 +19,6 @@
package org.apache.tez.dag.app.dag.impl;
import org.apache.tez.dag.app.MockClock;
-import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@@ -83,6 +82,7 @@
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -127,6 +127,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2154,6 +2155,52 @@
Assert.assertEquals(1, taImpl.taskAttemptFinishedEventLogged);
}
+ @Test
+ public void testMapTaskIsBlamedImmediatelyOnLocalFetchFailure() throws ServicePluginException {
+ // local fetch failure or disk read error at source -> turn source attempt to FAIL_IN_PROGRESS
+ testMapTaskFailingForFetchFailureType(true, true, TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+ testMapTaskFailingForFetchFailureType(true, false, TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+ testMapTaskFailingForFetchFailureType(false, true, TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+
+ // remote fetch failure -> won't change current state
+ testMapTaskFailingForFetchFailureType(false, false, TaskAttemptStateInternal.NEW);
+ }
+
+ private void testMapTaskFailingForFetchFailureType(boolean isLocalFetch,
+ boolean isDiskErrorAtSource, TaskAttemptStateInternal expectedState) {
+ EventHandler eventHandler = mock(EventHandler.class);
+ TezTaskID taskID =
+ TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
+ TaskAttemptImpl sourceAttempt = new MockTaskAttemptImpl(taskID, 1, eventHandler, null,
+ new Configuration(), SystemClock.getInstance(), mock(TaskHeartbeatHandler.class), appCtx,
+ false, null, null, false);
+
+ // the original read error event, sent by reducer task
+ InputReadErrorEvent inputReadErrorEvent =
+ InputReadErrorEvent.create("", 0, 1, 1, isLocalFetch, isDiskErrorAtSource);
+ TezTaskAttemptID destTaskAttemptId = mock(TezTaskAttemptID.class);
+ when(destTaskAttemptId.getTaskID()).thenReturn(mock(TezTaskID.class));
+ when(destTaskAttemptId.getTaskID().getVertexID()).thenReturn(mock(TezVertexID.class));
+ when(appCtx.getCurrentDAG()).thenReturn(mock(DAG.class));
+ when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class)))
+ .thenReturn(mock(Vertex.class));
+ when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class)).getRunningTasks())
+ .thenReturn(100);
+
+ EventMetaData mockMeta = mock(EventMetaData.class);
+ when(mockMeta.getTaskAttemptID()).thenReturn(destTaskAttemptId);
+ TezEvent tezEvent = new TezEvent(inputReadErrorEvent, mockMeta);
+
+ // the event is propagated to map task's event handler
+ TaskAttemptEventOutputFailed outputFailedEvent =
+ new TaskAttemptEventOutputFailed(sourceAttempt.getID(), tezEvent, 11);
+
+ Assert.assertEquals(TaskAttemptStateInternal.NEW, sourceAttempt.getInternalState());
+ TaskAttemptStateInternal resultState = new TaskAttemptImpl.OutputReportedFailedTransition()
+ .transition(sourceAttempt, outputFailedEvent);
+ Assert.assertEquals(expectedState, resultState);
+ }
+
private Event verifyEventType(List<Event> events,
Class<? extends Event> eventClass, int expectedOccurences) {
int count = 0;
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index f294edc..b67883d 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -71,6 +71,7 @@
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
@@ -84,6 +85,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -101,6 +103,7 @@
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@@ -608,6 +611,10 @@
return new Shuffle(conf);
}
+ protected JobTokenSecretManager getSecretManager() {
+ return secretManager;
+ }
+
private void recoverState(Configuration conf) throws IOException {
Path recoveryRoot = getRecoveryPath();
if (recoveryRoot != null) {
@@ -727,7 +734,7 @@
private void addJobToken(JobID jobId, String user,
Token<JobTokenIdentifier> jobToken) {
userRsrc.put(jobId.toString(), user);
- secretManager.addTokenForJob(jobId.toString(), jobToken);
+ getSecretManager().addTokenForJob(jobId.toString(), jobToken);
LOG.info("Added token for " + jobId.toString());
}
@@ -772,7 +779,7 @@
private void removeJobShuffleInfo(JobID jobId) throws IOException {
String jobIdStr = jobId.toString();
- secretManager.removeTokenForJob(jobIdStr);
+ getSecretManager().removeTokenForJob(jobIdStr);
userRsrc.remove(jobIdStr);
if (stateDb != null) {
try {
@@ -1080,11 +1087,19 @@
try {
populateHeaders(mapIds, jobId, dagId, user, reduceRange,
response, keepAliveParam, mapOutputInfoMap);
- } catch(IOException e) {
+ } catch (DiskErrorException e) { // fatal error: fetcher should be aware of that
+ LOG.error("Shuffle error in populating headers (fatal: DiskErrorException):", e);
+ String errorMessage = getErrorMessage(e);
+ // custom message, might be noticed by fetchers
+ // it should reuse the current response object, as headers have been already set for it
+ sendFakeShuffleHeaderWithError(ctx,
+ ShuffleHandlerError.DISK_ERROR_EXCEPTION + ": " + errorMessage, response);
+ return;
+ } catch (IOException e) {
ch.write(response);
LOG.error("Shuffle error in populating headers :", e);
String errorMessage = getErrorMessage(e);
- sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+ sendError(ctx, errorMessage, INTERNAL_SERVER_ERROR);
return;
}
ch.write(response);
@@ -1337,7 +1352,7 @@
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
- SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
+ SecretKey tokenSecret = getSecretManager().retrieveTokenSecret(appid);
if (null == tokenSecret) {
LOG.info("Request for unknown token " + appid);
throw new IOException("could not find jobid");
@@ -1444,22 +1459,37 @@
return writeFuture;
}
- protected void sendError(ChannelHandlerContext ctx,
- HttpResponseStatus status) {
+ protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
sendError(ctx, "", status);
}
- protected void sendError(ChannelHandlerContext ctx, String message,
- HttpResponseStatus status) {
+ protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ sendError(ctx, message, response);
+ }
+
+ protected void sendError(ChannelHandlerContext ctx, String message, HttpResponse response) {
+ sendError(ctx, ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8), response);
+ }
+
+ private void sendFakeShuffleHeaderWithError(ChannelHandlerContext ctx, String message,
+ HttpResponse response) throws IOException {
+ ShuffleHeader header = new ShuffleHeader(message, -1, -1, -1);
+ DataOutputBuffer out = new DataOutputBuffer();
+ header.write(out);
+
+ sendError(ctx, wrappedBuffer(out.getData(), 0, out.getLength()), response);
+ }
+
+ protected void sendError(ChannelHandlerContext ctx, ChannelBuffer content,
+ HttpResponse response) {
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+ response.setContent(content);
// Close the connection as soon as the error message is sent.
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index a7f4446..5ca4ed8 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -20,6 +20,7 @@
//import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
//import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
//import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import static org.junit.Assert.assertTrue;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
@@ -41,15 +42,12 @@
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -58,11 +56,14 @@
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.MapTask;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
@@ -172,6 +173,52 @@
}
}
+ class MockShuffleHandlerWithFatalDiskError extends org.apache.tez.auxservices.ShuffleHandler {
+ public static final String MESSAGE =
+ "Could not find application_1234/240/output/attempt_1234_0/file.out.index";
+
+ private JobTokenSecretManager secretManager =
+ new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(getSecret().getBytes()));
+
+ protected JobTokenSecretManager getSecretManager(){
+ return secretManager;
+ }
+
+ @Override
+ protected Shuffle getShuffle(final Configuration conf) {
+ return new Shuffle(conf) {
+ @Override
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request,
+ HttpResponse response, URL requestUri) throws IOException {
+ super.verifyRequest(appid, ctx, request, response, requestUri);
+ }
+
+ @Override
+ protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, Range reduceRange,
+ String jobId, String user) {
+ return null;
+ }
+
+ @Override
+ protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user,
+ Range reduceRange, HttpResponse response, boolean keepAliveParam,
+ Map<String, MapOutputInfo> infoMap) throws IOException {
+ throw new DiskErrorException(MESSAGE);
+ }
+
+ @Override
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user,
+ String mapId, Range reduceRange, MapOutputInfo info) throws IOException {
+ return null;
+ }
+ };
+ }
+
+ public String getSecret() {
+ return "secret";
+ }
+ }
+
/**
* Test the validation of ShuffleHandler's meta-data's serialization and
* de-serialization.
@@ -1322,6 +1369,53 @@
sh.close();
}
+ @Test
+ public void testShuffleHandlerSendsDiskError() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+ DataInputStream input = null;
+ MockShuffleHandlerWithFatalDiskError shuffleHandler =
+ new MockShuffleHandlerWithFatalDiskError();
+ try {
+ shuffleHandler.init(conf);
+ shuffleHandler.start();
+
+ String shuffleBaseURL = "http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+ URL url = new URL(
+ shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
+ shuffleHandler.secretManager.addTokenForJob("job_12345_1",
+ new Token<>("id".getBytes(), shuffleHandler.getSecret().getBytes(), null, null));
+
+ HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
+ BaseHttpConnection httpConnection = ShuffleUtils.getHttpConnection(true, url,
+ httpConnectionParams, "testFetcher", shuffleHandler.secretManager);
+
+ boolean connectSucceeded = httpConnection.connect();
+ Assert.assertTrue(connectSucceeded);
+
+ input = httpConnection.getInputStream();
+ httpConnection.validate();
+
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(input);
+
+ // message is encoded in the shuffle header, and can be checked by fetchers
+ Assert.assertEquals(
+ ShuffleHandlerError.DISK_ERROR_EXCEPTION + ": " + MockShuffleHandlerWithFatalDiskError.MESSAGE,
+ header.getMapId());
+ Assert.assertEquals(-1, header.getCompressedLength());
+ Assert.assertEquals(-1, header.getUncompressedLength());
+ Assert.assertEquals(-1, header.getPartition());
+ } finally {
+ if (input != null) {
+ input.close();
+ }
+ shuffleHandler.close();
+ }
+ }
+
public ChannelFuture createMockChannelFuture(Channel mockCh,
final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
final ChannelFuture mockFuture = mock(ChannelFuture.class);
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index e7af4a1..ebea9a4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -191,6 +191,8 @@
.setIndex(ideEvt.getIndex())
.setDiagnostics(ideEvt.getDiagnostics())
.setVersion(ideEvt.getVersion())
+ .setIsLocalFetch(ideEvt.isLocalFetch())
+ .setIsDiskErrorAtSource(ideEvt.isDiskErrorAtSource())
.build();
break;
case TASK_ATTEMPT_FAILED_EVENT:
@@ -294,10 +296,9 @@
event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
break;
case INPUT_READ_ERROR_EVENT:
- InputReadErrorEventProto ideProto =
- InputReadErrorEventProto.parseFrom(input);
- event = InputReadErrorEvent.create(ideProto.getDiagnostics(),
- ideProto.getIndex(), ideProto.getVersion());
+ InputReadErrorEventProto ideProto = InputReadErrorEventProto.parseFrom(input);
+ event = InputReadErrorEvent.create(ideProto.getDiagnostics(), ideProto.getIndex(),
+ ideProto.getVersion(), ideProto.getIsLocalFetch(), ideProto.getIsDiskErrorAtSource());
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEventProto tfProto =
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 18f66cc..f295c06 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -66,7 +66,7 @@
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
-
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.common.Preconditions;
/**
@@ -277,7 +277,8 @@
HostFetchResult hostFetchResult;
- if (localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort) {
+ boolean isLocalFetch = localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort;
+ if (isLocalFetch) {
hostFetchResult = setupLocalDiskFetch();
} else if (multiplex) {
hostFetchResult = doSharedFetch();
@@ -288,7 +289,7 @@
if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
if (!isShutDown.get()) {
LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
- for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
+ for (InputAttemptFetchFailure left : hostFetchResult.failedInputs) {
fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
}
} else {
@@ -504,7 +505,7 @@
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
- InputAttemptIdentifier[] failedFetches = null;
+ InputAttemptFetchFailure[] failedFetches = null;
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug(
@@ -512,8 +513,7 @@
e.getClass().getName() + ", Message: " + e.getMessage());
}
} else {
- failedFetches = srcAttemptsRemaining.values().
- toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
+ failedFetches = InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedFetches, true);
}
@@ -547,7 +547,7 @@
"Fetch Failure while connecting from %s to: %s:%d, attempt: %s Informing ShuffleManager: ",
localHostname, host, port, firstAttempt), e);
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()),
- new InputAttemptIdentifier[] { firstAttempt }, true);
+ new InputAttemptFetchFailure[] { new InputAttemptFetchFailure(firstAttempt) }, true);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); //reset status
@@ -584,7 +584,7 @@
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
- InputAttemptIdentifier[] failedInputs = null;
+ InputAttemptFetchFailure[] failedInputs = null;
while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) {
InputAttemptIdentifier inputAttemptIdentifier =
srcAttemptsRemaining.entrySet().iterator().next().getValue();
@@ -711,7 +711,7 @@
}
}
- InputAttemptIdentifier[] failedFetches = null;
+ InputAttemptFetchFailure[] failedFetches = null;
if (failMissing && srcAttemptsRemaining.size() > 0) {
if (isShutDown.get()) {
if (isDebugEnabled) {
@@ -720,8 +720,8 @@
" remaining inputs");
}
} else {
- failedFetches = srcAttemptsRemaining.values().
- toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
+ failedFetches =
+ InputAttemptFetchFailure.fromAttemptsLocalFetchFailure(srcAttemptsRemaining.values());
}
} else {
// nothing needs to be done to requeue remaining entries
@@ -770,10 +770,10 @@
static class HostFetchResult {
private final FetchResult fetchResult;
- private final InputAttemptIdentifier[] failedInputs;
+ private final InputAttemptFetchFailure[] failedInputs;
private final boolean connectFailed;
- public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs,
+ public HostFetchResult(FetchResult fetchResult, InputAttemptFetchFailure[] failedInputs,
boolean connectFailed) {
this.fetchResult = fetchResult;
this.failedInputs = failedInputs;
@@ -831,8 +831,11 @@
return "id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce;
}
}
- private InputAttemptIdentifier[] fetchInputs(DataInputStream input,
- CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
+
+ @VisibleForTesting
+ InputAttemptFetchFailure[] fetchInputs(DataInputStream input, CachingCallBack callback,
+ InputAttemptIdentifier inputAttemptIdentifier)
+ throws FetcherReadTimeoutException {
FetchedInput fetchedInput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = 0;
@@ -856,9 +859,19 @@
header.readFields(input);
pathComponent = header.getMapId();
if (!pathComponent.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
- throw new IllegalArgumentException("Invalid map id: " + header.getMapId() + ", expected to start with " +
- InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
- + " while fetching " + inputAttemptIdentifier);
+ if (pathComponent.startsWith(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString())) {
+ LOG.warn("Invalid map id: " + header.getMapId() + ", expected to start with "
+ + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
+ + " while fetching " + inputAttemptIdentifier);
+ // this should be treated as local fetch failure while reporting later
+ return new InputAttemptFetchFailure[] {
+ InputAttemptFetchFailure.fromDiskErrorAtSource(inputAttemptIdentifier) };
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid map id: " + header.getMapId() + ", expected to start with "
+ + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
+ + " while fetching " + inputAttemptIdentifier);
+ }
}
srcAttemptId = pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition()));
@@ -883,7 +896,7 @@
if (!isShutDown.get()) {
LOG.warn("Invalid src id ", e);
// Don't know which one was bad, so consider all of them as bad
- return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
+ return InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
} else {
if (isDebugEnabled) {
LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
@@ -902,7 +915,8 @@
srcAttemptId = getNextRemainingAttempt();
}
assert (srcAttemptId != null);
- return new InputAttemptIdentifier[]{srcAttemptId};
+ return new InputAttemptFetchFailure[] {
+ InputAttemptFetchFailure.fromAttempt(srcAttemptId) };
} else {
if (isDebugEnabled) {
LOG.debug("Already shutdown. Ignoring verification failure.");
@@ -1004,10 +1018,10 @@
// Cleanup the fetchedInput before returning.
cleanupFetchedInput(fetchedInput);
if (srcAttemptId == null) {
- return srcAttemptsRemaining.values()
- .toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
+ return InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
} else {
- return new InputAttemptIdentifier[] { srcAttemptId };
+ return new InputAttemptFetchFailure[] {
+ new InputAttemptFetchFailure(srcAttemptId) };
}
}
LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + " (to "
@@ -1016,7 +1030,8 @@
// Cleanup the fetchedInput
cleanupFetchedInput(fetchedInput);
// metrics.failedFetch();
- return new InputAttemptIdentifier[] { srcAttemptId };
+ return new InputAttemptFetchFailure[] {
+ new InputAttemptFetchFailure(srcAttemptId) };
}
return null;
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java
index 34bd272..b751fb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java
@@ -28,6 +28,7 @@
FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
throws IOException;
- public void fetchFailed(String host, InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed);
+ public void fetchFailed(String host, InputAttemptFetchFailure srcAttemptFetchFailure,
+ boolean connectFailed);
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java
new file mode 100644
index 0000000..d94db35
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * InputAttemptFetchFailure is supposed to wrap an InputAttemptIdentifier with any kind of failure
+ * information during fetch. It can be useful for propagating as a single object instead of multiple
+ * parameters (local fetch error, remote fetch error, connect failed, read failed, etc.).
+ */
+public class InputAttemptFetchFailure {
+
+ private final InputAttemptIdentifier inputAttemptIdentifier;
+ private final boolean isLocalFetch;
+ private final boolean isDiskErrorAtSource;
+
+ public InputAttemptFetchFailure(InputAttemptIdentifier inputAttemptIdentifier) {
+ this(inputAttemptIdentifier, false, false);
+ }
+
+ public InputAttemptFetchFailure(InputAttemptIdentifier inputAttemptIdentifier,
+ boolean isLocalFetch, boolean isDiskErrorAtSource) {
+ this.inputAttemptIdentifier = inputAttemptIdentifier;
+ this.isLocalFetch = isLocalFetch;
+ this.isDiskErrorAtSource = isDiskErrorAtSource;
+ }
+
+ public InputAttemptIdentifier getInputAttemptIdentifier() {
+ return inputAttemptIdentifier;
+ }
+
+ public boolean isLocalFetch() {
+ return isLocalFetch;
+ }
+
+ public boolean isDiskErrorAtSource() {
+ return isDiskErrorAtSource;
+ }
+
+ public static InputAttemptFetchFailure fromAttempt(InputAttemptIdentifier attempt) {
+ return new InputAttemptFetchFailure(attempt, false, false);
+ }
+
+ public static InputAttemptFetchFailure fromLocalFetchFailure(InputAttemptIdentifier attempt) {
+ return new InputAttemptFetchFailure(attempt, true, false);
+ }
+
+ public static InputAttemptFetchFailure fromDiskErrorAtSource(InputAttemptIdentifier attempt) {
+ return new InputAttemptFetchFailure(attempt, false, true);
+ }
+
+ public static InputAttemptFetchFailure[] fromAttempts(Collection<InputAttemptIdentifier> values) {
+ return values.stream().map(identifier -> new InputAttemptFetchFailure(identifier, false, false))
+ .toArray(InputAttemptFetchFailure[]::new);
+ }
+
+ public static InputAttemptFetchFailure[] fromAttempts(InputAttemptIdentifier[] values) {
+ return Arrays.asList(values).stream()
+ .map(identifier -> new InputAttemptFetchFailure(identifier, false, false))
+ .toArray(InputAttemptFetchFailure[]::new);
+ }
+
+ public static InputAttemptFetchFailure[] fromAttemptsLocalFetchFailure(
+ Collection<InputAttemptIdentifier> values) {
+ return values.stream().map(identifier -> new InputAttemptFetchFailure(identifier, true, false))
+ .toArray(InputAttemptFetchFailure[]::new);
+ }
+
+ public static InputAttemptFetchFailure fromCompositeAttemptLocalFetchFailure(
+ CompositeInputAttemptIdentifier compositeInputAttemptIdentifier) {
+ return new InputAttemptFetchFailure(compositeInputAttemptIdentifier, true, false);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || (obj.getClass() != this.getClass())) {
+ return false;
+ }
+ return inputAttemptIdentifier.equals(((InputAttemptFetchFailure) obj).inputAttemptIdentifier)
+ && isLocalFetch == ((InputAttemptFetchFailure) obj).isLocalFetch
+ && isDiskErrorAtSource == ((InputAttemptFetchFailure) obj).isDiskErrorAtSource;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * inputAttemptIdentifier.hashCode() + 31 * (isLocalFetch ? 0 : 1)
+ + 31 * (isDiskErrorAtSource ? 0 : 1);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s, isLocalFetch: %s, isDiskErrorAtSource: %s",
+ inputAttemptIdentifier.toString(), isLocalFetch, isDiskErrorAtSource);
+ }
+}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 742fc18..2b83ad8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -84,6 +84,7 @@
import org.apache.tez.runtime.library.common.shuffle.Fetcher.FetcherBuilder;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
import org.apache.tez.runtime.library.common.shuffle.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.common.shuffle.InputHost.PartitionToInputs;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -391,9 +392,9 @@
List<Event> failedEventsToSend = Lists.newArrayListWithCapacity(
failedEvents.size());
for (InputReadErrorEvent key : failedEvents.keySet()) {
- failedEventsToSend.add(InputReadErrorEvent
- .create(key.getDiagnostics(), key.getIndex(),
- key.getVersion(), failedEvents.get(key)));
+ failedEventsToSend.add(InputReadErrorEvent.create(key.getDiagnostics(),
+ key.getIndex(), key.getVersion(), failedEvents.get(key), key.isLocalFetch(),
+ key.isDiskErrorAtSource()));
}
inputContext.sendEvents(failedEventsToSend);
failedEvents.clear();
@@ -939,12 +940,15 @@
@Override
public void fetchFailed(String host,
- InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+ InputAttemptFetchFailure inputAttemptFetchFailure, boolean connectFailed) {
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
// For now, reporting immediately.
- LOG.info(srcNameTrimmed + ": " + "Fetch failed for src: " + srcAttemptIdentifier
- + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
- + connectFailed);
+ InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier();
+ LOG.info(
+ "{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, "
+ + "local fetch: {}, remote fetch failure reported as local failure: {})",
+ srcNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed,
+ inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource());
failedShufflesCounter.increment(1);
inputContext.notifyProgress();
if (srcAttemptIdentifier == null) {
@@ -957,7 +961,9 @@
srcAttemptIdentifier.getInputIdentifier(),
srcAttemptIdentifier.getAttemptNumber()),
srcAttemptIdentifier.getInputIdentifier(),
- srcAttemptIdentifier.getAttemptNumber());
+ srcAttemptIdentifier.getAttemptNumber(),
+ inputAttemptFetchFailure.isLocalFetch(),
+ inputAttemptFetchFailure.isDiskErrorAtSource());
if (maxTimeToWaitForReportMillis > 0) {
try {
reportLock.lock();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index e732ab0..3272327 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -50,7 +50,9 @@
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import com.google.common.annotations.VisibleForTesting;
@@ -272,7 +274,8 @@
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
- InputAttemptIdentifier[] failedTasks = null;
+ InputAttemptFetchFailure[] failedTasks = null;
+
while (!remaining.isEmpty() && failedTasks == null) {
InputAttemptIdentifier inputAttemptIdentifier =
remaining.entrySet().iterator().next().getValue();
@@ -300,25 +303,14 @@
}
return;
}
- failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+ failedTasks = new InputAttemptFetchFailure[] {
+ new InputAttemptFetchFailure(getNextRemainingAttempt()) };
break;
}
}
}
- if (failedTasks != null && failedTasks.length > 0) {
- if (stopped) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) +
- " since Fetcher has been stopped");
- }
- } else {
- LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
- for (InputAttemptIdentifier left : failedTasks) {
- scheduler.copyFailed(left, host, true, false, false);
- }
- }
- }
+ invokeCopyFailedForFailedTasks(host, failedTasks);
cleanupCurrentConnection(false);
@@ -332,6 +324,23 @@
}
}
+ private void invokeCopyFailedForFailedTasks(MapHost host,
+ InputAttemptFetchFailure[] failedTasks) {
+ if (failedTasks != null && failedTasks.length > 0) {
+ if (stopped) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks)
+ + " since Fetcher has been stopped");
+ }
+ } else {
+ LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
+ for (InputAttemptFetchFailure left : failedTasks) {
+ scheduler.copyFailed(left, host, true, false);
+ }
+ }
+ }
+ }
+
@VisibleForTesting
boolean setupConnection(MapHost host, Collection<InputAttemptIdentifier> attempts)
throws IOException {
@@ -380,7 +389,8 @@
for (InputAttemptIdentifier left : remaining.values()) {
// Need to be handling temporary glitches ..
// Report read error to the AM to trigger source failure heuristics
- scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left), host, connectSucceeded,
+ !connectSucceeded);
}
return false;
}
@@ -404,7 +414,8 @@
}
}
- private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
+ private static final InputAttemptFetchFailure[] EMPTY_ATTEMPT_ID_ARRAY =
+ new InputAttemptFetchFailure[0];
private static class MapOutputStat {
final InputAttemptIdentifier srcAttemptId;
@@ -425,8 +436,8 @@
}
}
- protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
- DataInputStream input, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
+ protected InputAttemptFetchFailure[] copyMapOutput(MapHost host, DataInputStream input,
+ InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
MapOutput mapOutput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = 0;
@@ -452,7 +463,13 @@
badIdErrs.increment(1);
LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " +
InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce);
- return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
+ if (header.mapId.startsWith(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString())) {
+ //this should be treated as local fetch failure while reporting later
+ return new InputAttemptFetchFailure[] {
+ InputAttemptFetchFailure.fromDiskErrorAtSource(getNextRemainingAttempt()) };
+ }
+ return new InputAttemptFetchFailure[] {
+ InputAttemptFetchFailure.fromAttempt(getNextRemainingAttempt()) };
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Already shutdown. Ignoring invalid map id error");
@@ -477,7 +494,8 @@
LOG.warn("Invalid map id ", e);
// Don't know which one was bad, so consider this one bad and dont read
// the remaining because we dont know where to start reading from. YARN-1773
- return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
+ return new InputAttemptFetchFailure[] {
+ new InputAttemptFetchFailure(getNextRemainingAttempt()) };
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " +
@@ -497,7 +515,8 @@
LOG.warn("Was expecting " + srcAttemptId + " but got null");
}
assert (srcAttemptId != null);
- return new InputAttemptIdentifier[]{srcAttemptId};
+ return new InputAttemptFetchFailure[] {
+ new InputAttemptFetchFailure(getNextRemainingAttempt()) };
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Already stopped. Ignoring verification failure.");
@@ -595,9 +614,10 @@
srcAttemptId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
if (srcAttemptId == null) {
- return remaining.values().toArray(new InputAttemptIdentifier[remaining.values().size()]);
+ return InputAttemptFetchFailure.fromAttempts(remaining.values());
} else {
- return new InputAttemptIdentifier[]{srcAttemptId};
+ return new InputAttemptFetchFailure[] {
+ new InputAttemptFetchFailure(srcAttemptId) };
}
}
LOG.warn("Failed to shuffle output of " + srcAttemptId +
@@ -605,7 +625,8 @@
// Inform the shuffle-scheduler
mapOutput.abort();
- return new InputAttemptIdentifier[] {srcAttemptId};
+ return new InputAttemptFetchFailure[] {
+ new InputAttemptFetchFailure(srcAttemptId) };
}
return null;
}
@@ -734,7 +755,8 @@
if (!stopped) {
hasFailures = true;
ioErrs.increment(1);
- scheduler.copyFailed(srcAttemptId, host, true, false, true);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId),
+ host, true, false);
LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
host.getHostIdentifier(), e);
} else {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
index 9f883db..f074e89 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
@@ -103,4 +103,11 @@
WritableUtils.writeVLong(out, uncompressedLength);
WritableUtils.writeVInt(out, forReduce);
}
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ShuffleHeader [mapId=%s, uncompressedLength=%d, compressedLength=%d, forReduce=%d]", mapId,
+ uncompressedLength, compressedLength, forReduce);
+ }
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 0954a76..e7f63ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -80,6 +80,7 @@
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
import org.apache.tez.runtime.library.common.shuffle.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -755,16 +756,13 @@
}
}
- public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
- MapHost host,
- boolean readError,
- boolean connectError,
- boolean isLocalFetch) {
+ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
+ boolean readError, boolean connectError) {
failedShuffleCounter.increment(1);
inputContext.notifyProgress();
- int failures = incrementAndGetFailureAttempt(srcAttempt);
+ int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
- if (!isLocalFetch) {
+ if (!fetchFailure.isLocalFetch()) {
/**
* Track the number of failures that has happened since last completion.
* This gets reset on a successful copy.
@@ -789,11 +787,11 @@
if (shouldInformAM) {
//Inform AM. In case producer needs to be restarted, it is handled at AM.
- informAM(srcAttempt);
+ informAM(fetchFailure);
}
//Restart consumer in case shuffle is not healthy
- if (!isShuffleHealthy(srcAttempt)) {
+ if (!isShuffleHealthy(fetchFailure.getInputAttemptIdentifier())) {
return;
}
@@ -868,21 +866,24 @@
}
// Notify AM
- private void informAM(InputAttemptIdentifier srcAttempt) {
+ private void informAM(InputAttemptFetchFailure fetchFailure) {
+ InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
LOG.info(
- srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
- + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils
- .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
- srcAttempt.getInputIdentifier(),
- srcAttempt.getAttemptNumber()) + " to AM.");
+ "{}: Reporting fetch failure for InputIdentifier: {} taskAttemptIdentifier: {}, "
+ + "local fetch: {}, remote fetch failure reported as local failure: {}) to AM.",
+ srcNameTrimmed, srcAttempt,
+ TezRuntimeUtils.getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+ srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()),
+ fetchFailure.isLocalFetch(), fetchFailure.isDiskErrorAtSource());
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
- failedEvents.add(InputReadErrorEvent.create(
- "Fetch failure for " + TezRuntimeUtils
- .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
- srcAttempt.getInputIdentifier(),
- srcAttempt.getAttemptNumber()) + " to jobtracker.",
- srcAttempt.getInputIdentifier(),
- srcAttempt.getAttemptNumber()));
+ failedEvents.add(
+ InputReadErrorEvent.create(
+ "Fetch failure for "
+ + TezRuntimeUtils.getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+ srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber())
+ + " to jobtracker.",
+ srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber(),
+ fetchFailure.isLocalFetch(), fetchFailure.isDiskErrorAtSource()));
inputContext.sendEvents(failedEvents);
}
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index db9c7af..05d4eb4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -32,6 +32,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -46,10 +47,16 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.testutils.RuntimeTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -79,7 +86,8 @@
Fetcher fetcher = spy(builder.build());
FetchResult fr = new FetchResult(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
- Fetcher.HostFetchResult hfr = new Fetcher.HostFetchResult(fr, srcAttempts, false);
+ Fetcher.HostFetchResult hfr =
+ new Fetcher.HostFetchResult(fr, InputAttemptFetchFailure.fromAttempts(srcAttempts), false);
doReturn(hfr).when(fetcher).setupLocalDiskFetch();
doReturn(null).when(fetcher).doHttpFetch();
doNothing().when(fetcher).shutdown();
@@ -151,7 +159,7 @@
};
final int FIRST_FAILED_ATTEMPT_IDX = 2;
final int SECOND_FAILED_ATTEMPT_IDX = 4;
- final int[] sucessfulAttempts = {0, 1, 3};
+ final int[] successfulAttempts = {0, 1, 3};
TezConfiguration conf = new TezConfiguration();
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
@@ -206,18 +214,24 @@
doNothing().when(fetcher).shutdown();
doNothing().when(callback).fetchSucceeded(anyString(), any(InputAttemptIdentifier.class),
any(FetchedInput.class), anyLong(), anyLong(), anyLong());
- doNothing().when(callback).fetchFailed(anyString(), any(InputAttemptIdentifier.class), eq(false));
+ doNothing().when(callback).fetchFailed(anyString(), any(InputAttemptFetchFailure.class), eq(false));
FetchResult fetchResult = fetcher.call();
verify(fetcher).setupLocalDiskFetch();
- // expect 3 sucesses and 2 failures
- for (int i : sucessfulAttempts) {
+ // expect 3 successes and 2 failures
+ for (int i : successfulAttempts) {
verifyFetchSucceeded(callback, srcAttempts[i], conf);
}
- verify(callback).fetchFailed(eq(HOST), eq(srcAttempts[FIRST_FAILED_ATTEMPT_IDX]), eq(false));
- verify(callback).fetchFailed(eq(HOST), eq(srcAttempts[SECOND_FAILED_ATTEMPT_IDX]), eq(false));
+ verify(callback).fetchFailed(eq(HOST),
+ eq(InputAttemptFetchFailure
+ .fromCompositeAttemptLocalFetchFailure(srcAttempts[FIRST_FAILED_ATTEMPT_IDX])),
+ eq(false));
+ verify(callback).fetchFailed(eq(HOST),
+ eq(InputAttemptFetchFailure
+ .fromCompositeAttemptLocalFetchFailure(srcAttempts[SECOND_FAILED_ATTEMPT_IDX])),
+ eq(false));
Assert.assertEquals("fetchResult host", fetchResult.getHost(), HOST);
Assert.assertEquals("fetchResult partition", fetchResult.getPartition(), partition);
@@ -304,4 +318,30 @@
Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0);
}
}
+
+ @Test
+ public void testShuffleHandlerDiskErrorUnordered()
+ throws Exception {
+ Configuration conf = new Configuration();
+
+ InputContext inputContext = mock(InputContext.class);
+ doReturn(new TezCounters()).when(inputContext).getCounters();
+ doReturn("vertex").when(inputContext).getSourceVertexName();
+
+ Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(mock(ShuffleManager.class), null,
+ null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
+ false, true, false);
+ builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(new InputAttemptIdentifier(0, 0)));
+
+ Fetcher fetcher = builder.build();
+ ShuffleHeader header =
+ new ShuffleHeader(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString(), -1, -1, -1);
+ DataInputStream input = RuntimeTestUtils.shuffleHeaderToDataInput(header);
+
+ InputAttemptFetchFailure[] failures =
+ fetcher.fetchInputs(input, null, new InputAttemptIdentifier(0, 0));
+ Assert.assertEquals(1, failures.length);
+ Assert.assertTrue(failures[0].isDiskErrorAtSource());
+ Assert.assertFalse(failures[0].isLocalFetch());
+ }
}
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index 94f7f5a..041fd03 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -22,7 +22,6 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
@@ -37,7 +36,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -55,7 +53,6 @@
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
@@ -66,9 +63,9 @@
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.Fetcher;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.junit.After;
import org.junit.Assert;
@@ -234,12 +231,12 @@
}
}
});
- InputAttemptIdentifier inputAttemptIdentifier
- = new InputAttemptIdentifier(1, 1);
+ InputAttemptFetchFailure inputAttemptFetchFailure =
+ new InputAttemptFetchFailure(new InputAttemptIdentifier(1, 1));
schedulerGetHostThread.start();
Thread.sleep(1000);
- shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+ shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
Thread.sleep(1000);
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
@@ -254,8 +251,8 @@
Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(),
inputEvent.getNumFailures(), 1);
- shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
- shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+ shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
+ shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
Thread.sleep(1000);
verify(inputContext, times(1)).sendEvents(any());
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 5f7fe4b..028fbce 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -42,6 +42,7 @@
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -72,7 +73,10 @@
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
+import org.apache.tez.runtime.library.testutils.RuntimeTestUtils;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -241,7 +245,7 @@
);
final int FIRST_FAILED_ATTEMPT_IDX = 2;
final int SECOND_FAILED_ATTEMPT_IDX = 4;
- final int[] sucessfulAttemptsIndexes = { 0, 1, 3 };
+ final int[] successfulAttemptsIndexes = { 0, 1, 3 };
doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
@@ -311,13 +315,17 @@
spyFetcher.setupLocalDiskFetch(host);
// should have exactly 3 success and 1 failure.
- for (int i : sucessfulAttemptsIndexes) {
+ for (int i : successfulAttemptsIndexes) {
for (int j = 0; j < host.getPartitionCount(); j++) {
verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
}
}
- verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
- verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
+ verify(scheduler).copyFailed(
+ eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0))),
+ eq(host), eq(true), eq(false));
+ verify(scheduler).copyFailed(
+ eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0))),
+ eq(host), eq(true), eq(false));
verify(spyFetcher).putBackRemainingMapOutputs(host);
verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
@@ -426,7 +434,7 @@
);
final int FIRST_FAILED_ATTEMPT_IDX = 2;
final int SECOND_FAILED_ATTEMPT_IDX = 4;
- final int[] sucessfulAttemptsIndexes = { 0, 1, 3 };
+ final int[] successfulAttemptsIndexes = { 0, 1, 3 };
doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap
@@ -503,15 +511,23 @@
spyFetcher.setupLocalDiskFetch(host);
// should have exactly 3 success and 1 failure.
- for (int i : sucessfulAttemptsIndexes) {
+ for (int i : successfulAttemptsIndexes) {
for (int j = 0; j < host.getPartitionCount(); j++) {
verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
}
}
- verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
- verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true);
- verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
- verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true);
+ verify(scheduler).copyFailed(
+ eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0))),
+ eq(host), eq(true), eq(false));
+ verify(scheduler).copyFailed(
+ eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1))),
+ eq(host), eq(true), eq(false));
+ verify(scheduler).copyFailed(eq(
+ InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0))),
+ eq(host), eq(true), eq(false));
+ verify(scheduler).copyFailed(eq(
+ InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1))),
+ eq(host), eq(true), eq(false));
verify(spyFetcher).putBackRemainingMapOutputs(host);
verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
@@ -630,8 +646,8 @@
//setup connection should be called twice (1 for connect and another for retry)
verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class));
//since copyMapOutput consistently fails, it should call copyFailed once
- verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class),
- anyBoolean(), anyBoolean(), anyBoolean());
+ verify(scheduler, times(1)).copyFailed(any(InputAttemptFetchFailure.class), any(MapHost.class),
+ anyBoolean(), anyBoolean());
verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class));
verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class),
@@ -750,6 +766,32 @@
}
}
+ @Test
+ public void testShuffleHandlerDiskErrorOrdered()
+ throws Exception {
+ MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
+ InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
+
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, null, null, null, null, false,
+ 0, null, new TezConfiguration(), null, false, HOST, PORT, "src vertex", mapHost,
+ ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
+ connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
+ fetcher.remaining = new HashMap<String, InputAttemptIdentifier>();
+
+ ShuffleHeader header =
+ new ShuffleHeader(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString(), -1, -1, -1);
+ DataInputStream input = RuntimeTestUtils.shuffleHeaderToDataInput(header);
+
+ // copyMapOutput is used for remote fetch, this time it returns a fetch failure, which is fatal
+ // and should be treated as a local fetch failure
+ InputAttemptFetchFailure[] failures =
+ fetcher.copyMapOutput(mapHost, input, inputAttemptIdentifier);
+
+ Assert.assertEquals(1, failures.length);
+ Assert.assertTrue(failures[0].isDiskErrorAtSource());
+ Assert.assertFalse(failures[0].isLocalFetch());
+ }
+
private RawLocalFileSystem getRawFs(Configuration conf) {
try {
return (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index fabfa27..b89ffb0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -55,6 +55,7 @@
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -248,8 +249,8 @@
for (int i = 100; i < 199; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
@@ -257,9 +258,8 @@
new InputAttemptIdentifier(200, 0, "attempt_");
//Should fail here and report exception as reducer is not healthy
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (200 %
- totalProducerNodes),
- 10000, 200, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (200 % totalProducerNodes), 10000, 200, 1), false, true);
int minFailurePerHost = conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
@@ -330,8 +330,8 @@
for (int i = 190; i < 200; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
//Shuffle has not stalled. so no issues.
@@ -342,9 +342,8 @@
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(190, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" +
- (190 % totalProducerNodes),
- 10000, 190, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (190 % totalProducerNodes), 10000, 190, 1), false, true);
//Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
@@ -355,16 +354,17 @@
for (int i = 190; i < 200; i++) {
inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
+ InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i, 1), false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i, 1), false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i, 1), false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i, 1), false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i, 1), false, true);
}
assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
@@ -376,12 +376,14 @@
for (int i = 110; i < 120; i++) {
inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
+ InputAttemptFetchFailure failure =
+ InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
}
// Should fail now due to fetcherHealthy. (stall has already happened and
@@ -432,8 +434,8 @@
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(319, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
- 10000, 319, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
//stall the shuffle
scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -441,15 +443,13 @@
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
- totalProducerNodes),
- 10000, 319, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
- totalProducerNodes),
- 10000, 310, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
- totalProducerNodes),
- 10000, 310, 1), false, true, false);
+ InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+ scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 310, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 310, 1),
+ false, true);
// failedShufflesSinceLastCompletion has crossed the limits. Throw error
verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -487,15 +487,15 @@
for (int i = 0; i < 64; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
+ InputAttemptFetchFailure failure =
+ InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
- totalProducerNodes), 10000, i, 1), false, true, false);
-
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
- totalProducerNodes), 10000, i, 1), false, true, false);
-
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
- totalProducerNodes), 10000, i, 1), false, true, false);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
@@ -518,8 +518,8 @@
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(319, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
- 10000, 319, 1), false, true, false);
+ scheduler.copyFailed(new InputAttemptFetchFailure(inputAttemptIdentifier),
+ new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
//stall the shuffle (but within limits)
scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
@@ -527,15 +527,13 @@
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
- totalProducerNodes),
- 10000, 319, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
- totalProducerNodes),
- 10000, 319, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
- totalProducerNodes),
- 10000, 319, 1), false, true, false);
+ InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+ scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+ false, true);
// failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
// well. However, it has failed only in one host. So this should proceed
@@ -544,9 +542,8 @@
//stall the shuffle (but within limits)
scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
- totalProducerNodes),
- 10000, 319, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
verify(shuffle, times(1)).reportException(any(Throwable.class));
}
@@ -592,8 +589,9 @@
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(318, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes),
- 10000, 318, 1), false, true, false);
+ InputAttemptFetchFailure failure = new InputAttemptFetchFailure(inputAttemptIdentifier);
+ scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes),
+ 10000, 318, 1), false, true);
//stall the shuffle
scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -601,15 +599,12 @@
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
- totalProducerNodes),
- 10000, 318, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
- totalProducerNodes),
- 10000, 318, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
- totalProducerNodes),
- 10000, 318, 1), false, true, false);
+ scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
+ false, true);
//Shuffle has not received the events completely. So do not bail out yet.
verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -672,8 +667,8 @@
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
assertTrue(scheduler.failureCounts.size() >= 5);
@@ -686,10 +681,10 @@
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
- 10000, i, 1), false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
}
boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
@@ -749,18 +744,15 @@
for (int i = 100; i < 199; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
- false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
- false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
- false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
- false, true, false);
+ InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
+ scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+ false, true);
}
verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
@@ -799,7 +791,8 @@
MapHost mapHost = scheduler.pendingHosts.iterator().next();
//Fails to pull from host0. host0 should be added to penalties
- scheduler.copyFailed(inputAttemptIdentifier, mapHost, false, true, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier), mapHost,
+ false, true);
//Should not get host, as it is added to penalty loop
MapHost host = scheduler.getHost();
@@ -993,7 +986,8 @@
}
for (int i = 0; i < 10; i++) {
- scheduler.copyFailed(identifiers[0], mapHosts[0], false, false, false);
+ scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(identifiers[0]), mapHosts[0], false,
+ false);
}
ShuffleScheduler.Penalty[] penaltyArray = new ShuffleScheduler.Penalty[scheduler.getPenalties().size()];
scheduler.getPenalties().toArray(penaltyArray);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RuntimeTestUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RuntimeTestUtils.java
new file mode 100644
index 0000000..0885178
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RuntimeTestUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.testutils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+
+public final class RuntimeTestUtils {
+
+ private RuntimeTestUtils() {
+ }
+
+ public static DataInputStream shuffleHeaderToDataInput(ShuffleHeader header) throws IOException {
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(1000);
+ DataOutputStream output = new DataOutputStream(byteOutput);
+ header.write(output);
+
+ InputStream inputStream = new ByteArrayInputStream(byteOutput.toByteArray());
+ DataInputStream input = new DataInputStream(inputStream);
+
+ return input;
+ }
+}