TEZ-2850. Tez MergeManager OOM for small Map Outputs (jeagles)
(cherry picked from commit a9cfeb914001c381877657ea39b5de5451740050)
(cherry picked from commit 35bd5ab0b58fa4bd2b9162c14baa7e94e22ba00c)
diff --git a/CHANGES.txt b/CHANGES.txt
index 62a4929..0356b75 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2850. Tez MergeManager OOM for small Map Outputs
TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
TEZ-2855. Fix a potential NPE while routing VertexManager events.
TEZ-2716. DefaultSorter.isRleNeeded not thread safe.
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index f075772..1e1c547 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -506,10 +506,10 @@
protected int recNo = 1;
protected int originalKeyLength;
protected int prevKeyLength;
- protected int currentKeyLength;
- protected int currentValueLength;
byte keyBytes[] = new byte[0];
+ protected int currentKeyLength;
+ protected int currentValueLength;
long startPos;
protected boolean isCompressed = false;
@@ -570,22 +570,26 @@
TezCounter readsCounter, TezCounter bytesReadCounter,
boolean readAhead, int readAheadLength,
int bufferSize, boolean isCompressed) throws IOException {
- this.isCompressed = isCompressed;
- checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/);
- if (isCompressed && codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- if (decompressor != null) {
- this.in = codec.createInputStream(checksumIn, decompressor);
+ if (in != null) {
+ checksumIn = new IFileInputStream(in, length, readAhead,
+ readAheadLength/* , isCompressed */);
+ if (isCompressed && codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (decompressor != null) {
+ this.in = codec.createInputStream(checksumIn, decompressor);
+ } else {
+ LOG.warn("Could not obtain decompressor from CodecPool");
+ this.in = checksumIn;
+ }
} else {
- LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
+ startPos = checksumIn.getPosition();
} else {
- this.in = checksumIn;
+ this.in = null;
}
this.dataIn = new DataInputStream(this.in);
- startPos = checksumIn.getPosition();
this.readRecordsCounter = readsCounter;
this.bytesReadCounter = bytesReadCounter;
this.fileLength = length;