TEZ-3970. NullPointerException in Tez ShuffleHandler Ranged Fetch (Jonathan Eagles via kshukla)
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 24a821f..f294edc 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -1399,9 +1399,9 @@
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
// Free the memory needed to store the spill and index records
- outputInfo.finish();
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
+ outputInfo.finish();
final long rangeOffset = firstIndex.getStartOffset();
final long rangePartLength = lastIndex.getStartOffset() + lastIndex.getPartLength() - firstIndex.getStartOffset();
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 7d53abc..7c421a9 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MapTask;
@@ -75,6 +76,8 @@
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -658,6 +661,102 @@
}
/**
+ * Validate the ranged fetch works as expected
+ */
+ @Test(timeout = 10000)
+ public void testRangedFetch() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "simple");
+ UserGroupInformation.setConfiguration(conf);
+ File absLogDir = new File("target",
+ TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+ ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ LOG.info(appId.toString());
+ String appAttemptId = "attempt_12345_1_m_1_0";
+ String user = "randomUser";
+ String reducerIdStart = "0";
+ String reducerIdEnd = "1";
+ List<File> fileMap = new ArrayList<>();
+ createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+ conf, fileMap);
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
+
+ @Override
+ protected Shuffle getShuffle(Configuration conf) {
+ // replace the shuffle handler with one stubbed for testing
+ return new Shuffle(conf) {
+
+ @Override
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL requestUri)
+ throws IOException {
+ // Do nothing.
+ }
+
+ };
+ }
+ };
+ shuffleHandler.init(conf);
+ try {
+ shuffleHandler.start();
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt =
+ new Token<JobTokenIdentifier>("identifier".getBytes(),
+ "password".getBytes(), new Text(user), new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffleHandler
+ .initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+ URL url =
+ new URL(
+ "http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerIdStart + "-" + reducerIdEnd
+ + "&map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ boolean succeeded = false;
+ try {
+ DataInputStream is = new DataInputStream(conn.getInputStream());
+ int partitionCount = WritableUtils.readVInt(is);
+ List<ShuffleHeader> headers = new ArrayList<>(2);
+ for (int i = 0; i < partitionCount; i++) {
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(is);
+ Assert.assertEquals("Incorrect map id", "attempt_12345_1_m_1_0", header.getMapId());
+ Assert.assertEquals("Incorrect reduce id", i, header.getPartition());
+ headers.add(header);
+ }
+ for (ShuffleHeader header: headers) {
+ byte[] bytes = new byte[(int)header.getCompressedLength()];
+ is.read(bytes);
+ }
+ succeeded = true;
+ // Read one more byte to force EOF
+ is.readByte();
+ Assert.fail("More fetch bytes that expected in stream");
+ } catch (EOFException e) {
+ Assert.assertTrue("Failed to copy ranged fetch", succeeded);
+ }
+
+ } finally {
+ shuffleHandler.stop();
+ FileUtil.fullyDelete(absLogDir);
+ }
+ }
+
+ /**
* Validate the ownership of the map-output files being pulled in. The
* local-file-system owner of the file should match the user component in the
*
@@ -785,18 +884,11 @@
System.out.println("Deleting existing file");
indexFile.delete();
}
- indexFile.createNewFile();
- FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
- new Path(indexFile.getAbsolutePath()));
Checksum crc = new PureJavaCrc32();
- crc.reset();
- CheckedOutputStream chk = new CheckedOutputStream(output, crc);
- String msg = "Writing new index file. This file will be used only " +
- "for the testing.";
- chk.write(Arrays.copyOf(msg.getBytes(),
- MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
- output.writeLong(chk.getChecksum().getValue());
- output.close();
+ TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
+ tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0);
+ tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1);
+ tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, crc);
}
@Test