TEZ-2850. Tez MergeManager OOM for small Map Outputs (jeagles)

(cherry picked from commit a9cfeb914001c381877657ea39b5de5451740050)
(cherry picked from commit 35bd5ab0b58fa4bd2b9162c14baa7e94e22ba00c)
diff --git a/CHANGES.txt b/CHANGES.txt
index 62a4929..0356b75 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2850. Tez MergeManager OOM for small Map Outputs
   TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
   TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2716. DefaultSorter.isRleNeeded not thread safe.
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index f075772..1e1c547 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -506,10 +506,10 @@
     protected int recNo = 1;
     protected int originalKeyLength;
     protected int prevKeyLength;
-    protected int currentKeyLength;
-    protected int currentValueLength;
     byte keyBytes[] = new byte[0];
 
+    protected int currentKeyLength;
+    protected int currentValueLength;
     long startPos;
     protected boolean isCompressed = false;
 
@@ -570,22 +570,26 @@
                   TezCounter readsCounter, TezCounter bytesReadCounter,
                   boolean readAhead, int readAheadLength,
                   int bufferSize, boolean isCompressed) throws IOException {
-      this.isCompressed = isCompressed;
-      checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/);
-      if (isCompressed && codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-        if (decompressor != null) {
-          this.in = codec.createInputStream(checksumIn, decompressor);
+      if (in != null) {
+        checksumIn = new IFileInputStream(in, length, readAhead,
+            readAheadLength/* , isCompressed */);
+        if (isCompressed && codec != null) {
+          decompressor = CodecPool.getDecompressor(codec);
+          if (decompressor != null) {
+            this.in = codec.createInputStream(checksumIn, decompressor);
+          } else {
+            LOG.warn("Could not obtain decompressor from CodecPool");
+            this.in = checksumIn;
+          }
         } else {
-          LOG.warn("Could not obtain decompressor from CodecPool");
           this.in = checksumIn;
         }
+        startPos = checksumIn.getPosition();
       } else {
-        this.in = checksumIn;
+        this.in = null;
       }
 
       this.dataIn = new DataInputStream(this.in);
-      startPos = checksumIn.getPosition();
       this.readRecordsCounter = readsCounter;
       this.bytesReadCounter = bytesReadCounter;
       this.fileLength = length;