TEZ-4233: Map task should be blamed earlier for local fetch failures (Laszlo Bdoor reviewed by Rajesh Balamohan)
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
index cabc39f..8ef50eb 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
@@ -51,26 +51,44 @@
    */
   private final int numFailures;
 
-  private InputReadErrorEvent(final String diagnostics, final int index,
-                              final int version, final int numFailures) {
+  /**
+   * Whether this input read error is caused while fetching local file.
+   */
+  private final boolean isLocalFetch;
+
+  /**
+   * Whether this input read error is caused because the fetcher detected a fatal, unrecoverable,
+   * local file read issue from the shuffle handler.
+   */
+  private final boolean isDiskErrorAtSource;
+
+  private InputReadErrorEvent(final String diagnostics, final int index, final int version,
+      final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) {
     super();
     this.diagnostics = diagnostics;
     this.index = index;
     this.version = version;
     this.numFailures = numFailures;
+    this.isLocalFetch = isLocalFetch;
+    this.isDiskErrorAtSource = isDiskErrorAtSource;
   }
 
-  public static InputReadErrorEvent create(String diagnostics, int index,
-                                           int version) {
-    return create(diagnostics, index, version, 1);
+  public static InputReadErrorEvent create(String diagnostics, int index, int version,
+      boolean isLocalFetch, boolean isDiskErrorAtSource) {
+    return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource);
+  }
+
+  public static InputReadErrorEvent create(String diagnostics, int index, int version) {
+    return create(diagnostics, index, version, 1, false, false);
   }
 
   /**
    * Create an InputReadErrorEvent.
    */
   public static InputReadErrorEvent create(final String diagnostics, final int index,
-      final int version, final int numFailures) {
-    return new InputReadErrorEvent(diagnostics, index, version, numFailures);
+      final int version, final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) {
+    return new InputReadErrorEvent(diagnostics, index, version, numFailures, isLocalFetch,
+        isDiskErrorAtSource);
   }
 
   public String getDiagnostics() {
@@ -92,6 +110,14 @@
     return numFailures;
   }
 
+  public boolean isLocalFetch() {
+    return isLocalFetch;
+  }
+
+  public boolean isDiskErrorAtSource() {
+    return isDiskErrorAtSource;
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(index, version);
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/ShuffleHandlerError.java b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/ShuffleHandlerError.java
new file mode 100644
index 0000000..09137de
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/ShuffleHandlerError.java
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.runtime.library.common.shuffle.api;
+
+/**
+ * ShuffleHandlerError enum encapsulates possible error messages that can be propagated from
+ * ShuffleHandler to fetchers. Depending on the message, fetchers can make better decisions, or give
+ * AM a hint in order to let it make better decisions in case of shuffle issues.
+ */
+public enum ShuffleHandlerError {
+  DISK_ERROR_EXCEPTION
+}
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/package-info.java b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/package-info.java
new file mode 100644
index 0000000..9ad8e61
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/library/common/shuffle/api/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.runtime.library.common.shuffle.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index 7123500..e041c33 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -39,6 +39,8 @@
   optional int32 index = 1;
   optional string diagnostics = 2;
   optional int32 version = 3;
+  optional bool is_local_fetch = 4;
+  optional bool is_disk_error_at_source = 5;
 }
 
 message InputFailedEventProto {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 37e818e..9a5e73d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1825,6 +1825,7 @@
             + " at inputIndex " + failedInputIndexOnDestTa);
       long time = attempt.clock.getTime();
       Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId);
+
       if (firstErrReportTime == null) {
         attempt.uniquefailedOutputReports.put(failedDestTaId, time);
         firstErrReportTime = time;
@@ -1851,7 +1852,8 @@
       // If needed we can launch a background task without failing this task
       // to generate a copy of the output just in case.
       // If needed we can consider only running consumer tasks
-      if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits) {
+      if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits
+          && !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource())) {
         return attempt.getInternalState();
       }
       String message = attempt.getID() + " being failed for too many output errors. "
@@ -1862,7 +1864,10 @@
           + ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures
           + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC="
           + maxAllowedTimeForTaskReadErrorSec
-          + ", readErrorTimespan=" + readErrorTimespanSec;
+          + ", readErrorTimespan=" + readErrorTimespanSec
+          + ", isLocalFetch=" + readErrorEvent.isLocalFetch()
+          + ", isDiskErrorAtSource=" + readErrorEvent.isDiskErrorAtSource();
+
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
       // send input failed event
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 41cce3b..6862bec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -19,7 +19,6 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import org.apache.tez.dag.app.MockClock;
-import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -83,6 +82,7 @@
 import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -127,6 +127,7 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -2154,6 +2155,52 @@
     Assert.assertEquals(1, taImpl.taskAttemptFinishedEventLogged);
   }
 
+  @Test
+  public void testMapTaskIsBlamedImmediatelyOnLocalFetchFailure() throws ServicePluginException {
+    // local fetch failure or disk read error at source -> turn source attempt to FAIL_IN_PROGRESS
+    testMapTaskFailingForFetchFailureType(true, true, TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+    testMapTaskFailingForFetchFailureType(true, false, TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+    testMapTaskFailingForFetchFailureType(false, true, TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+
+    // remote fetch failure -> won't change current state
+    testMapTaskFailingForFetchFailureType(false, false, TaskAttemptStateInternal.NEW);
+  }
+
+  private void testMapTaskFailingForFetchFailureType(boolean isLocalFetch,
+      boolean isDiskErrorAtSource, TaskAttemptStateInternal expectedState) {
+    EventHandler eventHandler = mock(EventHandler.class);
+    TezTaskID taskID =
+        TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
+    TaskAttemptImpl sourceAttempt = new MockTaskAttemptImpl(taskID, 1, eventHandler, null,
+        new Configuration(), SystemClock.getInstance(), mock(TaskHeartbeatHandler.class), appCtx,
+        false, null, null, false);
+
+    // the original read error event, sent by reducer task
+    InputReadErrorEvent inputReadErrorEvent =
+        InputReadErrorEvent.create("", 0, 1, 1, isLocalFetch, isDiskErrorAtSource);
+    TezTaskAttemptID destTaskAttemptId = mock(TezTaskAttemptID.class);
+    when(destTaskAttemptId.getTaskID()).thenReturn(mock(TezTaskID.class));
+    when(destTaskAttemptId.getTaskID().getVertexID()).thenReturn(mock(TezVertexID.class));
+    when(appCtx.getCurrentDAG()).thenReturn(mock(DAG.class));
+    when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class)))
+        .thenReturn(mock(Vertex.class));
+    when(appCtx.getCurrentDAG().getVertex(Mockito.any(TezVertexID.class)).getRunningTasks())
+        .thenReturn(100);
+
+    EventMetaData mockMeta = mock(EventMetaData.class);
+    when(mockMeta.getTaskAttemptID()).thenReturn(destTaskAttemptId);
+    TezEvent tezEvent = new TezEvent(inputReadErrorEvent, mockMeta);
+
+    // the event is propagated to map task's event handler
+    TaskAttemptEventOutputFailed outputFailedEvent =
+        new TaskAttemptEventOutputFailed(sourceAttempt.getID(), tezEvent, 11);
+
+    Assert.assertEquals(TaskAttemptStateInternal.NEW, sourceAttempt.getInternalState());
+    TaskAttemptStateInternal resultState = new TaskAttemptImpl.OutputReportedFailedTransition()
+        .transition(sourceAttempt, outputFailedEvent);
+    Assert.assertEquals(expectedState, resultState);
+  }
+
   private Event verifyEventType(List<Event> events,
       Class<? extends Event> eventClass, int expectedOccurences) {
     int count = 0;
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index f294edc..b67883d 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -71,6 +71,7 @@
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
@@ -84,6 +85,7 @@
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -101,6 +103,7 @@
 import org.iq80.leveldb.Logger;
 import org.iq80.leveldb.Options;
 import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -608,6 +611,10 @@
     return new Shuffle(conf);
   }
 
+  protected JobTokenSecretManager getSecretManager() {
+    return secretManager;
+  }
+
   private void recoverState(Configuration conf) throws IOException {
     Path recoveryRoot = getRecoveryPath();
     if (recoveryRoot != null) {
@@ -727,7 +734,7 @@
   private void addJobToken(JobID jobId, String user,
       Token<JobTokenIdentifier> jobToken) {
     userRsrc.put(jobId.toString(), user);
-    secretManager.addTokenForJob(jobId.toString(), jobToken);
+    getSecretManager().addTokenForJob(jobId.toString(), jobToken);
     LOG.info("Added token for " + jobId.toString());
   }
 
@@ -772,7 +779,7 @@
 
   private void removeJobShuffleInfo(JobID jobId) throws IOException {
     String jobIdStr = jobId.toString();
-    secretManager.removeTokenForJob(jobIdStr);
+    getSecretManager().removeTokenForJob(jobIdStr);
     userRsrc.remove(jobIdStr);
     if (stateDb != null) {
       try {
@@ -1080,11 +1087,19 @@
       try {
         populateHeaders(mapIds, jobId, dagId, user, reduceRange,
           response, keepAliveParam, mapOutputInfoMap);
-      } catch(IOException e) {
+      } catch (DiskErrorException e) { // fatal error: fetcher should be aware of that
+        LOG.error("Shuffle error in populating headers (fatal: DiskErrorException):", e);
+        String errorMessage = getErrorMessage(e);
+        // custom message, might be noticed by fetchers
+        // it should reuse the current response object, as headers have been already set for it
+        sendFakeShuffleHeaderWithError(ctx,
+            ShuffleHandlerError.DISK_ERROR_EXCEPTION + ": " + errorMessage, response);
+        return;
+      } catch (IOException e) {
         ch.write(response);
         LOG.error("Shuffle error in populating headers :", e);
         String errorMessage = getErrorMessage(e);
-        sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+        sendError(ctx, errorMessage, INTERNAL_SERVER_ERROR);
         return;
       }
       ch.write(response);
@@ -1337,7 +1352,7 @@
     protected void verifyRequest(String appid, ChannelHandlerContext ctx,
         HttpRequest request, HttpResponse response, URL requestUri)
         throws IOException {
-      SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
+      SecretKey tokenSecret = getSecretManager().retrieveTokenSecret(appid);
       if (null == tokenSecret) {
         LOG.info("Request for unknown token " + appid);
         throw new IOException("could not find jobid");
@@ -1444,22 +1459,37 @@
       return writeFuture;
     }
 
-    protected void sendError(ChannelHandlerContext ctx,
-        HttpResponseStatus status) {
+    protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
       sendError(ctx, "", status);
     }
 
-    protected void sendError(ChannelHandlerContext ctx, String message,
-        HttpResponseStatus status) {
+    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
       HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      sendError(ctx, message, response);
+    }
+
+    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponse response) {
+      sendError(ctx, ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8), response);
+    }
+
+    private void sendFakeShuffleHeaderWithError(ChannelHandlerContext ctx, String message,
+        HttpResponse response) throws IOException {
+      ShuffleHeader header = new ShuffleHeader(message, -1, -1, -1);
+      DataOutputBuffer out = new DataOutputBuffer();
+      header.write(out);
+
+      sendError(ctx, wrappedBuffer(out.getData(), 0, out.getLength()), response);
+    }
+
+    protected void sendError(ChannelHandlerContext ctx, ChannelBuffer content,
+        HttpResponse response) {
       response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
       // Put shuffle version into http header
       response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
       response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+      response.setContent(content);
 
       // Close the connection as soon as the error message is sent.
       ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index a7f4446..5ca4ed8 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -20,6 +20,7 @@
 //import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 //import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 //import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import static org.junit.Assert.assertTrue;
 import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
@@ -41,15 +42,12 @@
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.CheckedOutputStream;
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -58,11 +56,14 @@
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.MapTask;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
@@ -172,6 +173,52 @@
     }
   }
 
+  class MockShuffleHandlerWithFatalDiskError extends org.apache.tez.auxservices.ShuffleHandler {
+    public static final String MESSAGE =
+        "Could not find application_1234/240/output/attempt_1234_0/file.out.index";
+
+    private JobTokenSecretManager secretManager =
+        new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(getSecret().getBytes()));
+
+    protected JobTokenSecretManager getSecretManager(){
+      return secretManager;
+    }
+
+    @Override
+    protected Shuffle getShuffle(final Configuration conf) {
+      return new Shuffle(conf) {
+        @Override
+        protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request,
+            HttpResponse response, URL requestUri) throws IOException {
+          super.verifyRequest(appid, ctx, request, response, requestUri);
+        }
+
+        @Override
+        protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, Range reduceRange,
+            String jobId, String user) {
+          return null;
+        }
+
+        @Override
+        protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user,
+            Range reduceRange, HttpResponse response, boolean keepAliveParam,
+            Map<String, MapOutputInfo> infoMap) throws IOException {
+          throw new DiskErrorException(MESSAGE);
+        }
+
+        @Override
+        protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user,
+            String mapId, Range reduceRange, MapOutputInfo info) throws IOException {
+          return null;
+        }
+      };
+    }
+
+    public String getSecret() {
+      return "secret";
+    }
+  }
+
   /**
    * Test the validation of ShuffleHandler's meta-data's serialization and
    * de-serialization.
@@ -1322,6 +1369,53 @@
     sh.close();
   }
 
+  @Test
+  public void testShuffleHandlerSendsDiskError() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+    DataInputStream input = null;
+    MockShuffleHandlerWithFatalDiskError shuffleHandler =
+        new MockShuffleHandlerWithFatalDiskError();
+    try {
+      shuffleHandler.init(conf);
+      shuffleHandler.start();
+
+      String shuffleBaseURL = "http://127.0.0.1:"
+          + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+      URL url = new URL(
+          shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
+      shuffleHandler.secretManager.addTokenForJob("job_12345_1",
+          new Token<>("id".getBytes(), shuffleHandler.getSecret().getBytes(), null, null));
+
+      HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
+      BaseHttpConnection httpConnection = ShuffleUtils.getHttpConnection(true, url,
+          httpConnectionParams, "testFetcher", shuffleHandler.secretManager);
+
+      boolean connectSucceeded = httpConnection.connect();
+      Assert.assertTrue(connectSucceeded);
+
+      input = httpConnection.getInputStream();
+      httpConnection.validate();
+
+      ShuffleHeader header = new ShuffleHeader();
+      header.readFields(input);
+
+      // message is encoded in the shuffle header, and can be checked by fetchers
+      Assert.assertEquals(
+          ShuffleHandlerError.DISK_ERROR_EXCEPTION + ": " + MockShuffleHandlerWithFatalDiskError.MESSAGE,
+          header.getMapId());
+      Assert.assertEquals(-1, header.getCompressedLength());
+      Assert.assertEquals(-1, header.getUncompressedLength());
+      Assert.assertEquals(-1, header.getPartition());
+    } finally {
+      if (input != null) {
+        input.close();
+      }
+      shuffleHandler.close();
+    }
+  }
+
   public ChannelFuture createMockChannelFuture(Channel mockCh,
       final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
     final ChannelFuture mockFuture = mock(ChannelFuture.class);
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index e7af4a1..ebea9a4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -191,6 +191,8 @@
             .setIndex(ideEvt.getIndex())
             .setDiagnostics(ideEvt.getDiagnostics())
             .setVersion(ideEvt.getVersion())
+            .setIsLocalFetch(ideEvt.isLocalFetch())
+            .setIsDiskErrorAtSource(ideEvt.isDiskErrorAtSource())
             .build();
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
@@ -294,10 +296,9 @@
         event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
         break;
       case INPUT_READ_ERROR_EVENT:
-        InputReadErrorEventProto ideProto =
-            InputReadErrorEventProto.parseFrom(input);
-        event = InputReadErrorEvent.create(ideProto.getDiagnostics(),
-            ideProto.getIndex(), ideProto.getVersion());
+        InputReadErrorEventProto ideProto = InputReadErrorEventProto.parseFrom(input);
+        event = InputReadErrorEvent.create(ideProto.getDiagnostics(), ideProto.getIndex(),
+            ideProto.getVersion(), ideProto.getIsLocalFetch(), ideProto.getIsDiskErrorAtSource());
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
         TaskAttemptFailedEventProto tfProto =
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 18f66cc..f295c06 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -66,7 +66,7 @@
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
-
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
 import org.apache.tez.common.Preconditions;
 
 /**
@@ -277,7 +277,8 @@
 
     HostFetchResult hostFetchResult;
 
-    if (localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort) {
+    boolean isLocalFetch = localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort;
+    if (isLocalFetch) {
       hostFetchResult = setupLocalDiskFetch();
     } else if (multiplex) {
       hostFetchResult = doSharedFetch();
@@ -288,7 +289,7 @@
     if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
       if (!isShutDown.get()) {
         LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
-        for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
+        for (InputAttemptFetchFailure left : hostFetchResult.failedInputs) {
           fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
         }
       } else {
@@ -504,7 +505,7 @@
       // ioErrs.increment(1);
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
-      InputAttemptIdentifier[] failedFetches = null;
+      InputAttemptFetchFailure[] failedFetches = null;
       if (isShutDown.get()) {
         if (isDebugEnabled) {
           LOG.debug(
@@ -512,8 +513,7 @@
                   e.getClass().getName() + ", Message: " + e.getMessage());
         }
       } else {
-        failedFetches = srcAttemptsRemaining.values().
-            toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
+        failedFetches = InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
       }
       return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedFetches, true);
     }
@@ -547,7 +547,7 @@
             "Fetch Failure while connecting from %s to: %s:%d, attempt: %s Informing ShuffleManager: ",
             localHostname, host, port, firstAttempt), e);
         return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()),
-            new InputAttemptIdentifier[] { firstAttempt }, true);
+            new InputAttemptFetchFailure[] { new InputAttemptFetchFailure(firstAttempt) }, true);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt(); //reset status
@@ -584,7 +584,7 @@
     // On any error, faildTasks is not null and we exit
     // after putting back the remaining maps to the
     // yet_to_be_fetched list and marking the failed tasks.
-    InputAttemptIdentifier[] failedInputs = null;
+    InputAttemptFetchFailure[] failedInputs = null;
     while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) {
       InputAttemptIdentifier inputAttemptIdentifier =
           srcAttemptsRemaining.entrySet().iterator().next().getValue();
@@ -711,7 +711,7 @@
       }
     }
 
-    InputAttemptIdentifier[] failedFetches = null;
+    InputAttemptFetchFailure[] failedFetches = null;
     if (failMissing && srcAttemptsRemaining.size() > 0) {
       if (isShutDown.get()) {
         if (isDebugEnabled) {
@@ -720,8 +720,8 @@
                   " remaining inputs");
         }
       } else {
-        failedFetches = srcAttemptsRemaining.values().
-            toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
+        failedFetches =
+            InputAttemptFetchFailure.fromAttemptsLocalFetchFailure(srcAttemptsRemaining.values());
       }
     } else {
       // nothing needs to be done to requeue remaining entries
@@ -770,10 +770,10 @@
 
   static class HostFetchResult {
     private final FetchResult fetchResult;
-    private final InputAttemptIdentifier[] failedInputs;
+    private final InputAttemptFetchFailure[] failedInputs;
     private final boolean connectFailed;
 
-    public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs,
+    public HostFetchResult(FetchResult fetchResult, InputAttemptFetchFailure[] failedInputs,
                            boolean connectFailed) {
       this.fetchResult = fetchResult;
       this.failedInputs = failedInputs;
@@ -831,8 +831,11 @@
       return "id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce;
     }
   }
-  private InputAttemptIdentifier[] fetchInputs(DataInputStream input,
-      CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
+
+  @VisibleForTesting
+  InputAttemptFetchFailure[] fetchInputs(DataInputStream input, CachingCallBack callback,
+      InputAttemptIdentifier inputAttemptIdentifier)
+      throws FetcherReadTimeoutException {
     FetchedInput fetchedInput = null;
     InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = 0;
@@ -856,9 +859,19 @@
           header.readFields(input);
           pathComponent = header.getMapId();
           if (!pathComponent.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
-            throw new IllegalArgumentException("Invalid map id: " + header.getMapId() + ", expected to start with " +
-                InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
-                + " while fetching " + inputAttemptIdentifier);
+            if (pathComponent.startsWith(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString())) {
+              LOG.warn("Invalid map id: " + header.getMapId() + ", expected to start with "
+                  + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
+                  + " while fetching " + inputAttemptIdentifier);
+              // this should be treated as local fetch failure while reporting later
+              return new InputAttemptFetchFailure[] {
+                  InputAttemptFetchFailure.fromDiskErrorAtSource(inputAttemptIdentifier) };
+            } else {
+              throw new IllegalArgumentException(
+                  "Invalid map id: " + header.getMapId() + ", expected to start with "
+                      + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
+                      + " while fetching " + inputAttemptIdentifier);
+            }
           }
 
           srcAttemptId = pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition()));
@@ -883,7 +896,7 @@
           if (!isShutDown.get()) {
             LOG.warn("Invalid src id ", e);
             // Don't know which one was bad, so consider all of them as bad
-            return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
+            return InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
           } else {
             if (isDebugEnabled) {
               LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
@@ -902,7 +915,8 @@
               srcAttemptId = getNextRemainingAttempt();
             }
             assert (srcAttemptId != null);
-            return new InputAttemptIdentifier[]{srcAttemptId};
+            return new InputAttemptFetchFailure[] {
+                InputAttemptFetchFailure.fromAttempt(srcAttemptId) };
           } else {
             if (isDebugEnabled) {
               LOG.debug("Already shutdown. Ignoring verification failure.");
@@ -1004,10 +1018,10 @@
         // Cleanup the fetchedInput before returning.
         cleanupFetchedInput(fetchedInput);
         if (srcAttemptId == null) {
-          return srcAttemptsRemaining.values()
-              .toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
+          return InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
         } else {
-          return new InputAttemptIdentifier[] { srcAttemptId };
+          return new InputAttemptFetchFailure[] {
+              new InputAttemptFetchFailure(srcAttemptId) };
         }
       }
       LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + " (to "
@@ -1016,7 +1030,8 @@
       // Cleanup the fetchedInput
       cleanupFetchedInput(fetchedInput);
       // metrics.failedFetch();
-      return new InputAttemptIdentifier[] { srcAttemptId };
+      return new InputAttemptFetchFailure[] {
+          new InputAttemptFetchFailure(srcAttemptId) };
     }
     return null;
   }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java
index 34bd272..b751fb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherCallback.java
@@ -28,6 +28,7 @@
       FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
       throws IOException;
   
-  public void fetchFailed(String host, InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed);
+  public void fetchFailed(String host, InputAttemptFetchFailure srcAttemptFetchFailure,
+      boolean connectFailed);
 
 }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java
new file mode 100644
index 0000000..d94db35
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * InputAttemptFetchFailure is supposed to wrap an InputAttemptIdentifier with any kind of failure
+ * information during fetch. It can be useful for propagating as a single object instead of multiple
+ * parameters (local fetch error, remote fetch error, connect failed, read failed, etc.).
+ */
+public class InputAttemptFetchFailure {
+
+  private final InputAttemptIdentifier inputAttemptIdentifier;
+  private final boolean isLocalFetch;
+  private final boolean isDiskErrorAtSource;
+
+  public InputAttemptFetchFailure(InputAttemptIdentifier inputAttemptIdentifier) {
+    this(inputAttemptIdentifier, false, false);
+  }
+
+  public InputAttemptFetchFailure(InputAttemptIdentifier inputAttemptIdentifier,
+      boolean isLocalFetch, boolean isDiskErrorAtSource) {
+    this.inputAttemptIdentifier = inputAttemptIdentifier;
+    this.isLocalFetch = isLocalFetch;
+    this.isDiskErrorAtSource = isDiskErrorAtSource;
+  }
+
+  public InputAttemptIdentifier getInputAttemptIdentifier() {
+    return inputAttemptIdentifier;
+  }
+
+  public boolean isLocalFetch() {
+    return isLocalFetch;
+  }
+
+  public boolean isDiskErrorAtSource() {
+    return isDiskErrorAtSource;
+  }
+
+  public static InputAttemptFetchFailure fromAttempt(InputAttemptIdentifier attempt) {
+    return new InputAttemptFetchFailure(attempt, false, false);
+  }
+
+  public static InputAttemptFetchFailure fromLocalFetchFailure(InputAttemptIdentifier attempt) {
+    return new InputAttemptFetchFailure(attempt, true, false);
+  }
+
+  public static InputAttemptFetchFailure fromDiskErrorAtSource(InputAttemptIdentifier attempt) {
+    return new InputAttemptFetchFailure(attempt, false, true);
+  }
+
+  public static InputAttemptFetchFailure[] fromAttempts(Collection<InputAttemptIdentifier> values) {
+    return values.stream().map(identifier -> new InputAttemptFetchFailure(identifier, false, false))
+        .toArray(InputAttemptFetchFailure[]::new);
+  }
+
+  public static InputAttemptFetchFailure[] fromAttempts(InputAttemptIdentifier[] values) {
+    return Arrays.asList(values).stream()
+        .map(identifier -> new InputAttemptFetchFailure(identifier, false, false))
+        .toArray(InputAttemptFetchFailure[]::new);
+  }
+
+  public static InputAttemptFetchFailure[] fromAttemptsLocalFetchFailure(
+      Collection<InputAttemptIdentifier> values) {
+    return values.stream().map(identifier -> new InputAttemptFetchFailure(identifier, true, false))
+        .toArray(InputAttemptFetchFailure[]::new);
+  }
+
+  public static InputAttemptFetchFailure fromCompositeAttemptLocalFetchFailure(
+      CompositeInputAttemptIdentifier compositeInputAttemptIdentifier) {
+    return new InputAttemptFetchFailure(compositeInputAttemptIdentifier, true, false);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || (obj.getClass() != this.getClass())) {
+      return false;
+    }
+    return inputAttemptIdentifier.equals(((InputAttemptFetchFailure) obj).inputAttemptIdentifier)
+        && isLocalFetch == ((InputAttemptFetchFailure) obj).isLocalFetch
+        && isDiskErrorAtSource == ((InputAttemptFetchFailure) obj).isDiskErrorAtSource;
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * inputAttemptIdentifier.hashCode() + 31 * (isLocalFetch ? 0 : 1)
+        + 31 * (isDiskErrorAtSource ? 0 : 1);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s, isLocalFetch: %s, isDiskErrorAtSource: %s",
+        inputAttemptIdentifier.toString(), isLocalFetch, isDiskErrorAtSource);
+  }
+}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 742fc18..2b83ad8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -84,6 +84,7 @@
 import org.apache.tez.runtime.library.common.shuffle.Fetcher.FetcherBuilder;
 import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
 import org.apache.tez.runtime.library.common.shuffle.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
 import org.apache.tez.runtime.library.common.shuffle.InputHost;
 import org.apache.tez.runtime.library.common.shuffle.InputHost.PartitionToInputs;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -391,9 +392,9 @@
               List<Event> failedEventsToSend = Lists.newArrayListWithCapacity(
                   failedEvents.size());
               for (InputReadErrorEvent key : failedEvents.keySet()) {
-                failedEventsToSend.add(InputReadErrorEvent
-                    .create(key.getDiagnostics(), key.getIndex(),
-                        key.getVersion(), failedEvents.get(key)));
+                failedEventsToSend.add(InputReadErrorEvent.create(key.getDiagnostics(),
+                    key.getIndex(), key.getVersion(), failedEvents.get(key), key.isLocalFetch(),
+                    key.isDiskErrorAtSource()));
               }
               inputContext.sendEvents(failedEventsToSend);
               failedEvents.clear();
@@ -939,12 +940,15 @@
 
   @Override
   public void fetchFailed(String host,
-      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+      InputAttemptFetchFailure inputAttemptFetchFailure, boolean connectFailed) {
     // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
     // For now, reporting immediately.
-    LOG.info(srcNameTrimmed + ": " + "Fetch failed for src: " + srcAttemptIdentifier
-        + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
-        + connectFailed);
+    InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier();
+    LOG.info(
+        "{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, "
+            + "local fetch: {}, remote fetch failure reported as local failure: {})",
+        srcNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed,
+        inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource());
     failedShufflesCounter.increment(1);
     inputContext.notifyProgress();
     if (srcAttemptIdentifier == null) {
@@ -957,7 +961,9 @@
               srcAttemptIdentifier.getInputIdentifier(),
               srcAttemptIdentifier.getAttemptNumber()),
           srcAttemptIdentifier.getInputIdentifier(),
-          srcAttemptIdentifier.getAttemptNumber());
+          srcAttemptIdentifier.getAttemptNumber(),
+          inputAttemptFetchFailure.isLocalFetch(),
+          inputAttemptFetchFailure.isDiskErrorAtSource());
       if (maxTimeToWaitForReportMillis > 0) {
         try {
           reportLock.lock();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index e732ab0..3272327 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -50,7 +50,9 @@
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -272,7 +274,8 @@
       // On any error, faildTasks is not null and we exit
       // after putting back the remaining maps to the 
       // yet_to_be_fetched list and marking the failed tasks.
-      InputAttemptIdentifier[] failedTasks = null;
+      InputAttemptFetchFailure[] failedTasks = null;
+
       while (!remaining.isEmpty() && failedTasks == null) {
         InputAttemptIdentifier inputAttemptIdentifier =
             remaining.entrySet().iterator().next().getValue();
@@ -300,25 +303,14 @@
               }
               return;
             }
-            failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+            failedTasks = new InputAttemptFetchFailure[] {
+                new InputAttemptFetchFailure(getNextRemainingAttempt()) };
             break;
           }
         }
       }
 
-      if (failedTasks != null && failedTasks.length > 0) {
-        if (stopped) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) +
-                " since Fetcher has been stopped");
-          }
-        } else {
-          LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
-          for (InputAttemptIdentifier left : failedTasks) {
-            scheduler.copyFailed(left, host, true, false, false);
-          }
-        }
-      }
+      invokeCopyFailedForFailedTasks(host, failedTasks);
 
       cleanupCurrentConnection(false);
 
@@ -332,6 +324,23 @@
     }
   }
 
+  private void invokeCopyFailedForFailedTasks(MapHost host,
+      InputAttemptFetchFailure[] failedTasks) {
+    if (failedTasks != null && failedTasks.length > 0) {
+      if (stopped) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks)
+              + " since Fetcher has been stopped");
+        }
+      } else {
+        LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
+        for (InputAttemptFetchFailure left : failedTasks) {
+          scheduler.copyFailed(left, host, true, false);
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   boolean setupConnection(MapHost host, Collection<InputAttemptIdentifier> attempts)
       throws IOException {
@@ -380,7 +389,8 @@
       for (InputAttemptIdentifier left : remaining.values()) {
         // Need to be handling temporary glitches ..
         // Report read error to the AM to trigger source failure heuristics
-        scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded, false);
+        scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left), host, connectSucceeded,
+            !connectSucceeded);
       }
       return false;
     }
@@ -404,7 +414,8 @@
     }
   }
 
-  private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
+  private static final InputAttemptFetchFailure[] EMPTY_ATTEMPT_ID_ARRAY =
+      new InputAttemptFetchFailure[0];
 
   private static class MapOutputStat {
     final InputAttemptIdentifier srcAttemptId;
@@ -425,8 +436,8 @@
     }
   }
 
-  protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
-                                DataInputStream input, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
+  protected InputAttemptFetchFailure[] copyMapOutput(MapHost host, DataInputStream input,
+      InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
     MapOutput mapOutput = null;
     InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = 0;
@@ -452,7 +463,13 @@
               badIdErrs.increment(1);
               LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " +
                   InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce);
-              return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
+              if (header.mapId.startsWith(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString())) {
+                //this should be treated as local fetch failure while reporting later
+                return new InputAttemptFetchFailure[] {
+                    InputAttemptFetchFailure.fromDiskErrorAtSource(getNextRemainingAttempt()) };
+              }
+              return new InputAttemptFetchFailure[] {
+                  InputAttemptFetchFailure.fromAttempt(getNextRemainingAttempt()) };
             } else {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Already shutdown. Ignoring invalid map id error");
@@ -477,7 +494,8 @@
             LOG.warn("Invalid map id ", e);
             // Don't know which one was bad, so consider this one bad and dont read
             // the remaining because we dont know where to start reading from. YARN-1773
-            return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
+            return new InputAttemptFetchFailure[] {
+                new InputAttemptFetchFailure(getNextRemainingAttempt()) };
           } else {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " +
@@ -497,7 +515,8 @@
               LOG.warn("Was expecting " + srcAttemptId + " but got null");
             }
             assert (srcAttemptId != null);
-            return new InputAttemptIdentifier[]{srcAttemptId};
+            return new InputAttemptFetchFailure[] {
+                new InputAttemptFetchFailure(getNextRemainingAttempt()) };
           } else {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Already stopped. Ignoring verification failure.");
@@ -595,9 +614,10 @@
             srcAttemptId + " decomp: " +
             decompressedLength + ", " + compressedLength, ioe);
         if (srcAttemptId == null) {
-          return remaining.values().toArray(new InputAttemptIdentifier[remaining.values().size()]);
+          return InputAttemptFetchFailure.fromAttempts(remaining.values());
         } else {
-          return new InputAttemptIdentifier[]{srcAttemptId};
+          return new InputAttemptFetchFailure[] {
+              new InputAttemptFetchFailure(srcAttemptId) };
         }
       }
       LOG.warn("Failed to shuffle output of " + srcAttemptId +
@@ -605,7 +625,8 @@
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
-      return new InputAttemptIdentifier[] {srcAttemptId};
+      return new InputAttemptFetchFailure[] {
+          new InputAttemptFetchFailure(srcAttemptId) };
     }
     return null;
   }
@@ -734,7 +755,8 @@
             if (!stopped) {
               hasFailures = true;
               ioErrs.increment(1);
-              scheduler.copyFailed(srcAttemptId, host, true, false, true);
+              scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId),
+                  host, true, false);
               LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
                   host.getHostIdentifier(), e);
             } else {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
index 9f883db..f074e89 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
@@ -103,4 +103,11 @@
     WritableUtils.writeVLong(out, uncompressedLength);
     WritableUtils.writeVInt(out, forReduce);
   }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "ShuffleHeader [mapId=%s, uncompressedLength=%d, compressedLength=%d, forReduce=%d]", mapId,
+        uncompressedLength, compressedLength, forReduce);
+  }
 }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 0954a76..e7f63ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -80,6 +80,7 @@
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
 import org.apache.tez.runtime.library.common.shuffle.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
 
@@ -755,16 +756,13 @@
     }
   }
 
-  public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
-                                      MapHost host,
-                                      boolean readError,
-                                      boolean connectError,
-                                      boolean isLocalFetch) {
+  public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
+      boolean readError, boolean connectError) {
     failedShuffleCounter.increment(1);
     inputContext.notifyProgress();
-    int failures = incrementAndGetFailureAttempt(srcAttempt);
+    int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
 
-    if (!isLocalFetch) {
+    if (!fetchFailure.isLocalFetch()) {
       /**
        * Track the number of failures that has happened since last completion.
        * This gets reset on a successful copy.
@@ -789,11 +787,11 @@
 
     if (shouldInformAM) {
       //Inform AM. In case producer needs to be restarted, it is handled at AM.
-      informAM(srcAttempt);
+      informAM(fetchFailure);
     }
 
     //Restart consumer in case shuffle is not healthy
-    if (!isShuffleHealthy(srcAttempt)) {
+    if (!isShuffleHealthy(fetchFailure.getInputAttemptIdentifier())) {
       return;
     }
 
@@ -868,21 +866,24 @@
   }
 
   // Notify AM
-  private void informAM(InputAttemptIdentifier srcAttempt) {
+  private void informAM(InputAttemptFetchFailure fetchFailure) {
+    InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
     LOG.info(
-        srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
-            + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils
-            .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
-                srcAttempt.getInputIdentifier(),
-                srcAttempt.getAttemptNumber()) + " to AM.");
+        "{}: Reporting fetch failure for InputIdentifier: {} taskAttemptIdentifier: {}, "
+            + "local fetch: {}, remote fetch failure reported as local failure: {}) to AM.",
+        srcNameTrimmed, srcAttempt,
+        TezRuntimeUtils.getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+            srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()),
+        fetchFailure.isLocalFetch(), fetchFailure.isDiskErrorAtSource());
     List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-    failedEvents.add(InputReadErrorEvent.create(
-        "Fetch failure for " + TezRuntimeUtils
-            .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
-                srcAttempt.getInputIdentifier(),
-                srcAttempt.getAttemptNumber()) + " to jobtracker.",
-        srcAttempt.getInputIdentifier(),
-        srcAttempt.getAttemptNumber()));
+    failedEvents.add(
+        InputReadErrorEvent.create(
+            "Fetch failure for "
+                + TezRuntimeUtils.getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+                    srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber())
+                + " to jobtracker.",
+            srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber(),
+            fetchFailure.isLocalFetch(), fetchFailure.isDiskErrorAtSource()));
 
     inputContext.sendEvents(failedEvents);
   }
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index db9c7af..05d4eb4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -32,6 +32,7 @@
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,10 +47,16 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.testutils.RuntimeTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -79,7 +86,8 @@
     Fetcher fetcher = spy(builder.build());
 
     FetchResult fr = new FetchResult(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
-    Fetcher.HostFetchResult hfr = new Fetcher.HostFetchResult(fr, srcAttempts, false);
+    Fetcher.HostFetchResult hfr =
+        new Fetcher.HostFetchResult(fr, InputAttemptFetchFailure.fromAttempts(srcAttempts), false);
     doReturn(hfr).when(fetcher).setupLocalDiskFetch();
     doReturn(null).when(fetcher).doHttpFetch();
     doNothing().when(fetcher).shutdown();
@@ -151,7 +159,7 @@
     };
     final int FIRST_FAILED_ATTEMPT_IDX = 2;
     final int SECOND_FAILED_ATTEMPT_IDX = 4;
-    final int[] sucessfulAttempts = {0, 1, 3};
+    final int[] successfulAttempts = {0, 1, 3};
 
     TezConfiguration conf = new TezConfiguration();
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
@@ -206,18 +214,24 @@
     doNothing().when(fetcher).shutdown();
     doNothing().when(callback).fetchSucceeded(anyString(), any(InputAttemptIdentifier.class),
         any(FetchedInput.class), anyLong(), anyLong(), anyLong());
-    doNothing().when(callback).fetchFailed(anyString(), any(InputAttemptIdentifier.class), eq(false));
+    doNothing().when(callback).fetchFailed(anyString(), any(InputAttemptFetchFailure.class), eq(false));
 
     FetchResult fetchResult = fetcher.call();
 
     verify(fetcher).setupLocalDiskFetch();
 
-    // expect 3 sucesses and 2 failures
-    for (int i : sucessfulAttempts) {
+    // expect 3 successes and 2 failures
+    for (int i : successfulAttempts) {
       verifyFetchSucceeded(callback, srcAttempts[i], conf);
     }
-    verify(callback).fetchFailed(eq(HOST), eq(srcAttempts[FIRST_FAILED_ATTEMPT_IDX]), eq(false));
-    verify(callback).fetchFailed(eq(HOST), eq(srcAttempts[SECOND_FAILED_ATTEMPT_IDX]), eq(false));
+    verify(callback).fetchFailed(eq(HOST),
+        eq(InputAttemptFetchFailure
+            .fromCompositeAttemptLocalFetchFailure(srcAttempts[FIRST_FAILED_ATTEMPT_IDX])),
+        eq(false));
+    verify(callback).fetchFailed(eq(HOST),
+        eq(InputAttemptFetchFailure
+            .fromCompositeAttemptLocalFetchFailure(srcAttempts[SECOND_FAILED_ATTEMPT_IDX])),
+        eq(false));
 
     Assert.assertEquals("fetchResult host", fetchResult.getHost(), HOST);
     Assert.assertEquals("fetchResult partition", fetchResult.getPartition(), partition);
@@ -304,4 +318,30 @@
       Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0);
     }
   }
+
+  @Test
+  public void testShuffleHandlerDiskErrorUnordered()
+      throws Exception {
+    Configuration conf = new Configuration();
+
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+    doReturn("vertex").when(inputContext).getSourceVertexName();
+
+    Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(mock(ShuffleManager.class), null,
+        null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
+        false, true, false);
+    builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(new InputAttemptIdentifier(0, 0)));
+
+    Fetcher fetcher = builder.build();
+    ShuffleHeader header =
+        new ShuffleHeader(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString(), -1, -1, -1);
+    DataInputStream input = RuntimeTestUtils.shuffleHeaderToDataInput(header);
+
+    InputAttemptFetchFailure[] failures =
+        fetcher.fetchInputs(input, null, new InputAttemptIdentifier(0, 0));
+    Assert.assertEquals(1, failures.length);
+    Assert.assertTrue(failures[0].isDiskErrorAtSource());
+    Assert.assertFalse(failures[0].isLocalFetch());
+  }
 }
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index 94f7f5a..041fd03 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -22,7 +22,6 @@
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
@@ -37,7 +36,6 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -55,7 +53,6 @@
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
@@ -66,9 +63,9 @@
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.Fetcher;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
 import org.apache.tez.runtime.library.common.shuffle.FetchResult;
 import org.apache.tez.runtime.library.common.shuffle.InputHost;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.junit.After;
 import org.junit.Assert;
@@ -234,12 +231,12 @@
         }
       }
     });
-    InputAttemptIdentifier inputAttemptIdentifier
-        = new InputAttemptIdentifier(1, 1);
+    InputAttemptFetchFailure inputAttemptFetchFailure =
+        new InputAttemptFetchFailure(new InputAttemptIdentifier(1, 1));
 
     schedulerGetHostThread.start();
     Thread.sleep(1000);
-    shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+    shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
     Thread.sleep(1000);
 
     ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
@@ -254,8 +251,8 @@
     Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(),
         inputEvent.getNumFailures(), 1);
 
-    shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
-    shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+    shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
+    shuffleManager.fetchFailed("host1", inputAttemptFetchFailure, false);
 
     Thread.sleep(1000);
     verify(inputContext, times(1)).sendEvents(any());
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 5f7fe4b..028fbce 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -42,6 +42,7 @@
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -72,7 +73,10 @@
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
+import org.apache.tez.runtime.library.testutils.RuntimeTestUtils;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -241,7 +245,7 @@
     );
     final int FIRST_FAILED_ATTEMPT_IDX = 2;
     final int SECOND_FAILED_ATTEMPT_IDX = 4;
-    final int[] sucessfulAttemptsIndexes = { 0, 1, 3 };
+    final int[] successfulAttemptsIndexes = { 0, 1, 3 };
 
     doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
 
@@ -311,13 +315,17 @@
     spyFetcher.setupLocalDiskFetch(host);
 
     // should have exactly 3 success and 1 failure.
-    for (int i : sucessfulAttemptsIndexes) {
+    for (int i : successfulAttemptsIndexes) {
       for (int j = 0; j < host.getPartitionCount(); j++) {
         verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
       }
     }
-    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
-    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
+    verify(scheduler).copyFailed(
+        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0))),
+        eq(host), eq(true), eq(false));
+    verify(scheduler).copyFailed(
+        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0))),
+        eq(host), eq(true), eq(false));
 
     verify(spyFetcher).putBackRemainingMapOutputs(host);
     verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
@@ -426,7 +434,7 @@
     );
     final int FIRST_FAILED_ATTEMPT_IDX = 2;
     final int SECOND_FAILED_ATTEMPT_IDX = 4;
-    final int[] sucessfulAttemptsIndexes = { 0, 1, 3 };
+    final int[] successfulAttemptsIndexes = { 0, 1, 3 };
 
     doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
     final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap
@@ -503,15 +511,23 @@
     spyFetcher.setupLocalDiskFetch(host);
 
     // should have exactly 3 success and 1 failure.
-    for (int i : sucessfulAttemptsIndexes) {
+    for (int i : successfulAttemptsIndexes) {
       for (int j = 0; j < host.getPartitionCount(); j++) {
         verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
       }
     }
-    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
-    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true);
-    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true);
-    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true);
+    verify(scheduler).copyFailed(
+        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0))),
+        eq(host), eq(true), eq(false));
+    verify(scheduler).copyFailed(
+        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1))),
+        eq(host), eq(true), eq(false));
+    verify(scheduler).copyFailed(eq(
+        InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0))),
+        eq(host), eq(true), eq(false));
+    verify(scheduler).copyFailed(eq(
+        InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1))),
+        eq(host), eq(true), eq(false));
 
     verify(spyFetcher).putBackRemainingMapOutputs(host);
     verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
@@ -630,8 +646,8 @@
     //setup connection should be called twice (1 for connect and another for retry)
     verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class));
     //since copyMapOutput consistently fails, it should call copyFailed once
-    verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class),
-          anyBoolean(), anyBoolean(), anyBoolean());
+    verify(scheduler, times(1)).copyFailed(any(InputAttemptFetchFailure.class), any(MapHost.class),
+          anyBoolean(), anyBoolean());
 
     verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class));
     verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class),
@@ -750,6 +766,32 @@
     }
   }
 
+  @Test
+  public void testShuffleHandlerDiskErrorOrdered()
+      throws Exception {
+    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
+    InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
+
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, null, null, null, null, false,
+        0, null, new TezConfiguration(), null, false, HOST, PORT, "src vertex", mapHost,
+        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
+        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
+    fetcher.remaining = new HashMap<String, InputAttemptIdentifier>();
+
+    ShuffleHeader header =
+        new ShuffleHeader(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString(), -1, -1, -1);
+    DataInputStream input = RuntimeTestUtils.shuffleHeaderToDataInput(header);
+
+    // copyMapOutput is used for remote fetch, this time it returns a fetch failure, which is fatal
+    // and should be treated as a local fetch failure
+    InputAttemptFetchFailure[] failures =
+        fetcher.copyMapOutput(mapHost, input, inputAttemptIdentifier);
+
+    Assert.assertEquals(1, failures.length);
+    Assert.assertTrue(failures[0].isDiskErrorAtSource());
+    Assert.assertFalse(failures[0].isLocalFetch());
+  }
+
   private RawLocalFileSystem getRawFs(Configuration conf) {
     try {
       return (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index fabfa27..b89ffb0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -55,6 +55,7 @@
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -248,8 +249,8 @@
     for (int i = 100; i < 199; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
+      scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
     }
 
 
@@ -257,9 +258,8 @@
         new InputAttemptIdentifier(200, 0, "attempt_");
 
     //Should fail here and report exception as reducer is not healthy
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (200 %
-        totalProducerNodes),
-        10000, 200, 1), false, true, false);
+    scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+        new MapHost("host" + (200 % totalProducerNodes), 10000, 200, 1), false, true);
 
     int minFailurePerHost = conf.getInt(
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
@@ -330,8 +330,8 @@
     for (int i = 190; i < 200; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
+      scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
     }
 
     //Shuffle has not stalled. so no issues.
@@ -342,9 +342,8 @@
 
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(190, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" +
-        (190 % totalProducerNodes),
-        10000, 190, 1), false, true, false);
+    scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+        new MapHost("host" + (190 % totalProducerNodes), 10000, 190, 1), false, true);
 
     //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
     verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
@@ -355,16 +354,17 @@
     for (int i = 190; i < 200; i++) {
       inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
+      InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+          10000, i, 1), false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+          10000, i, 1), false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+          10000, i, 1), false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+          10000, i, 1), false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes),
+          10000, i, 1), false, true);
     }
 
     assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
@@ -376,12 +376,14 @@
     for (int i = 110; i < 120; i++) {
       inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
+      InputAttemptFetchFailure failure =
+          InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
     }
 
     // Should fail now due to fetcherHealthy. (stall has already happened and
@@ -432,8 +434,8 @@
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(319, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
-        10000, 319, 1), false, true, false);
+    scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+        new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
 
     //stall the shuffle
     scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -441,15 +443,13 @@
     assertEquals(scheduler.remainingMaps.get(), 1);
 
     //Retry for 3 more times
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
-        totalProducerNodes),
-        10000, 319, 1), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
-        totalProducerNodes),
-        10000, 310, 1), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
-        totalProducerNodes),
-        10000, 310, 1), false, true, false);
+    InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+    scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+        false, true);
+    scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 310, 1),
+        false, true);
+    scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 310, 1),
+        false, true);
 
     // failedShufflesSinceLastCompletion has crossed the limits. Throw error
     verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -487,15 +487,15 @@
     for (int i = 0; i < 64; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
+      InputAttemptFetchFailure failure =
+          InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
 
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
-          totalProducerNodes), 10000, i, 1), false, true, false);
-
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
-          totalProducerNodes), 10000, i, 1), false, true, false);
-
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
-          totalProducerNodes), 10000, i, 1), false, true, false);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
 
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
@@ -518,8 +518,8 @@
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(319, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
-        10000, 319, 1), false, true, false);
+    scheduler.copyFailed(new InputAttemptFetchFailure(inputAttemptIdentifier),
+        new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
 
     //stall the shuffle (but within limits)
     scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
@@ -527,15 +527,13 @@
     assertEquals(scheduler.remainingMaps.get(), 1);
 
     //Retry for 3 more times
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
-        totalProducerNodes),
-        10000, 319, 1), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
-        totalProducerNodes),
-        10000, 319, 1), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
-        totalProducerNodes),
-        10000, 319, 1), false, true, false);
+    InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+    scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+        false, true);
+    scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+        false, true);
+    scheduler.copyFailed(failure, new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1),
+        false, true);
 
     // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
     // well. However, it has failed only in one host. So this should proceed
@@ -544,9 +542,8 @@
 
     //stall the shuffle (but within limits)
     scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
-        totalProducerNodes),
-        10000, 319, 1), false, true, false);
+    scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+        new MapHost("host" + (319 % totalProducerNodes), 10000, 319, 1), false, true);
     verify(shuffle, times(1)).reportException(any(Throwable.class));
 
   }
@@ -592,8 +589,9 @@
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(318, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes),
-        10000, 318, 1), false, true, false);
+    InputAttemptFetchFailure failure = new InputAttemptFetchFailure(inputAttemptIdentifier);
+    scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes),
+        10000, 318, 1), false, true);
 
     //stall the shuffle
     scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -601,15 +599,12 @@
     assertEquals(scheduler.remainingMaps.get(), 1);
 
     //Retry for 3 more times
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
-        totalProducerNodes),
-        10000, 318, 1), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
-        totalProducerNodes),
-        10000, 318, 1), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
-        totalProducerNodes),
-        10000, 318, 1), false, true, false);
+    scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
+        false, true);
+    scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
+        false, true);
+    scheduler.copyFailed(failure, new MapHost("host" + (318 % totalProducerNodes), 10000, 318, 1),
+        false, true);
 
     //Shuffle has not received the events completely. So do not bail out yet.
     verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -672,8 +667,8 @@
     for (int i = 10; i < 15; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
+      scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
     }
 
     assertTrue(scheduler.failureCounts.size() >= 5);
@@ -686,10 +681,10 @@
     for (int i = 10; i < 15; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i, 1), false, true, false);
+      scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
+      scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true);
     }
 
     boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
@@ -749,18 +744,15 @@
     for (int i = 100; i < 199; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
-          false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
-          false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
-          false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
-          false, true, false);
+      InputAttemptFetchFailure failure = InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
+      scheduler.copyFailed(failure, new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
+          false, true);
     }
 
     verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
@@ -799,7 +791,8 @@
     MapHost mapHost = scheduler.pendingHosts.iterator().next();
 
     //Fails to pull from host0. host0 should be added to penalties
-    scheduler.copyFailed(inputAttemptIdentifier, mapHost, false, true, false);
+    scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier), mapHost,
+        false, true);
 
     //Should not get host, as it is added to penalty loop
     MapHost host = scheduler.getHost();
@@ -993,7 +986,8 @@
     }
 
     for (int i = 0; i < 10; i++) {
-      scheduler.copyFailed(identifiers[0], mapHosts[0], false, false, false);
+      scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(identifiers[0]), mapHosts[0], false,
+          false);
     }
     ShuffleScheduler.Penalty[] penaltyArray = new ShuffleScheduler.Penalty[scheduler.getPenalties().size()];
     scheduler.getPenalties().toArray(penaltyArray);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RuntimeTestUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RuntimeTestUtils.java
new file mode 100644
index 0000000..0885178
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RuntimeTestUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.testutils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+
+public final class RuntimeTestUtils {
+
+  private RuntimeTestUtils() {
+  }
+
+  public static DataInputStream shuffleHeaderToDataInput(ShuffleHeader header) throws IOException {
+    ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(1000);
+    DataOutputStream output = new DataOutputStream(byteOutput);
+    header.write(output);
+
+    InputStream inputStream = new ByteArrayInputStream(byteOutput.toByteArray());
+    DataInputStream input = new DataInputStream(inputStream);
+
+    return input;
+  }
+}