TEZ-3970. NullPointerException in Tez ShuffleHandler Ranged Fetch (Jonathan Eagles via kshukla)
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 24a821f..f294edc 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -1399,9 +1399,9 @@
         DataOutputBuffer dob = new DataOutputBuffer();
         header.write(dob);
         // Free the memory needed to store the spill and index records
-        outputInfo.finish();
         ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
       }
+      outputInfo.finish();
 
       final long rangeOffset = firstIndex.getStartOffset();
       final long rangePartLength = lastIndex.getStartOffset() + lastIndex.getPartLength() - firstIndex.getStartOffset();
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 7d53abc..7c421a9 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -55,6 +55,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.MapTask;
@@ -75,6 +76,8 @@
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -658,6 +661,102 @@
   }
 
   /**
+   * Validate the ranged fetch works as expected
+   */
+  @Test(timeout = 10000)
+  public void testRangedFetch() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "simple");
+    UserGroupInformation.setConfiguration(conf);
+    File absLogDir = new File("target",
+        TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+    ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    LOG.info(appId.toString());
+    String appAttemptId = "attempt_12345_1_m_1_0";
+    String user = "randomUser";
+    String reducerIdStart = "0";
+    String reducerIdEnd = "1";
+    List<File> fileMap = new ArrayList<>();
+    createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+        conf, fileMap);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+
+          @Override
+          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+                                       HttpRequest request, HttpResponse response, URL requestUri)
+              throws IOException {
+            // Do nothing.
+          }
+
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    try {
+      shuffleHandler.start();
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt =
+          new Token<JobTokenIdentifier>("identifier".getBytes(),
+              "password".getBytes(), new Text(user), new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffleHandler
+          .initializeApplication(new ApplicationInitializationContext(user,
+              appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+              outputBuffer.getLength())));
+      URL url =
+          new URL(
+              "http://127.0.0.1:"
+                  + shuffleHandler.getConfig().get(
+                  ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+                  + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerIdStart + "-" + reducerIdEnd
+                  + "&map=attempt_12345_1_m_1_0");
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      conn.connect();
+      boolean succeeded = false;
+      try {
+        DataInputStream is = new DataInputStream(conn.getInputStream());
+        int partitionCount = WritableUtils.readVInt(is);
+        List<ShuffleHeader> headers = new ArrayList<>(2);
+        for (int i = 0; i < partitionCount; i++) {
+          ShuffleHeader header = new ShuffleHeader();
+          header.readFields(is);
+          Assert.assertEquals("Incorrect map id", "attempt_12345_1_m_1_0", header.getMapId());
+          Assert.assertEquals("Incorrect reduce id", i, header.getPartition());
+          headers.add(header);
+        }
+        for (ShuffleHeader header: headers) {
+          byte[] bytes = new byte[(int)header.getCompressedLength()];
+          is.read(bytes);
+        }
+        succeeded = true;
+        // Read one more byte to force EOF
+        is.readByte();
+        Assert.fail("More fetch bytes that expected in stream");
+      } catch (EOFException e) {
+        Assert.assertTrue("Failed to copy ranged fetch", succeeded);
+      }
+
+    } finally {
+      shuffleHandler.stop();
+      FileUtil.fullyDelete(absLogDir);
+    }
+  }
+
+  /**
    * Validate the ownership of the map-output files being pulled in. The
    * local-file-system owner of the file should match the user component in the
    *
@@ -785,18 +884,11 @@
       System.out.println("Deleting existing file");
       indexFile.delete();
     }
-    indexFile.createNewFile();
-    FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
-        new Path(indexFile.getAbsolutePath()));
     Checksum crc = new PureJavaCrc32();
-    crc.reset();
-    CheckedOutputStream chk = new CheckedOutputStream(output, crc);
-    String msg = "Writing new index file. This file will be used only " +
-        "for the testing.";
-    chk.write(Arrays.copyOf(msg.getBytes(),
-        MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
-    output.writeLong(chk.getChecksum().getValue());
-    output.close();
+    TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
+    tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0);
+    tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1);
+    tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, crc);
   }
 
   @Test