TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)

Signed-off-by: Laszlo Bodor <bodorlaszlo0202@gmail.com>
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
index deab64f..df68c8d 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
@@ -70,6 +70,5 @@
     assertEquals(LongWritable.class.getName(), ConfigUtils
         .getIntermediateInputValueClass(confVertex1).getName());
     assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1));
-    assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex1));
   }
 }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
index 76d3dff..f83fdc9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
@@ -56,24 +56,6 @@
     }
     return codecClass;
   }
-  
-  public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
-      Configuration conf, Class<DefaultCodec> defaultValue) {
-    Class<? extends CompressionCodec> codecClass = defaultValue;
-    String name = conf
-        .get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
-    if (name != null) {
-      try {
-        codecClass = conf.getClassByName(name).asSubclass(
-            CompressionCodec.class);
-      } catch (ClassNotFoundException e) {
-        throw new IllegalArgumentException("Compression codec " + name
-            + " was not found.", e);
-      }
-    }
-    return codecClass;
-  }
-
 
   // TODO Move defaults over to a constants file.
   
@@ -82,11 +64,6 @@
         TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
   }
 
-  public static boolean isIntermediateInputCompressed(Configuration conf) {
-    return conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
-  }
-
   public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
     Class<V> retv = (Class<V>) conf.getClass(
         TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, null,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 8be8fa2..daeafbc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -265,16 +265,22 @@
   }
 
   public static String getBufferSizeProperty(CompressionCodec codec) {
-    switch (codec.getClass().getSimpleName().toString()) {
-    case "DefaultCodec":
+    return getBufferSizeProperty(codec.getClass().getName());
+  }
+
+  public static String getBufferSizeProperty(String className) {
+    switch (className) {
+    case "org.apache.hadoop.io.compress.DefaultCodec":
       return "io.file.buffer.size";
-    case "SnappyCodec":
+    case "org.apache.hadoop.io.compress.SnappyCodec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
-    case "ZStandardCodec":
+    case "org.apache.hadoop.io.compress.ZStandardCodec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
-    case "LzoCodec":
+    case "org.apache.hadoop.io.compress.LzoCodec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
-    case "Lz4Codec":
+    case "com.hadoop.compression.lzo.LzoCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
+    case "org.apache.hadoop.io.compress.Lz4Codec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
     default:
       return null;
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 38f079a..db5ef73 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -39,8 +39,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -51,12 +49,11 @@
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.tez.common.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -114,16 +111,8 @@
 
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
     
+    this.codec = CodecUtils.getCodec(conf);
 
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-      // Work around needed for HADOOP-12191. Avoids the native initialization synchronization race
-      codec.getDecompressorType();
-    } else {
-      codec = null;
-    }
     this.ifileReadAhead = conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 194e899..3ff74f7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -42,8 +42,6 @@
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progressable;
@@ -63,7 +61,7 @@
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.tez.common.Preconditions;
 
 @SuppressWarnings({"rawtypes"})
@@ -224,30 +222,7 @@
     numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
 
     // compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, this.conf);
-
-      if (codec != null) {
-        Class<? extends Compressor> compressorType = null;
-        Throwable cause = null;
-        try {
-          compressorType = codec.getCompressorType();
-        } catch (RuntimeException e) {
-          cause = e;
-        }
-        if (compressorType == null) {
-          String errMsg =
-              String.format("Unable to get CompressorType for codec (%s). This is most" +
-                      " likely due to missing native libraries for the codec.",
-                  conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC));
-          throw new IOException(errMsg, cause);
-        }
-      }
-    } else {
-      codec = null;
-    }
+    this.codec = CodecUtils.getCodec(conf);
 
     this.ifileReadAhead = this.conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 6aa44e2..1b2aeff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -30,20 +30,18 @@
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.runtime.library.utils.BufferUtils;
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -823,7 +821,8 @@
         decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {
           decompressor.reset();
-          in = getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, compressedLength);
+          in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
+              compressedLength);
         } else {
           LOG.warn("Could not obtain decompressor from CodecPool");
           in = checksumIn;
@@ -859,24 +858,6 @@
       }
     }
 
-    private static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
-        IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
-        throws IOException {
-      String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
-
-      if (bufferSizeProp != null) {
-        Configurable configurableCodec = (Configurable) codec;
-        Configuration conf = configurableCodec.getConf();
-
-        int bufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
-        LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
-            DEFAULT_BUFFER_SIZE, bufferSizeProp, bufSize);
-        conf.setInt(bufferSizeProp, bufSize);
-      }
-
-      return codec.createInputStream(checksumIn, decompressor);
-    }
-
     /**
      * Read entire IFile content to disk.
      *
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index ecc9e03..adea49f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -29,10 +29,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
@@ -43,6 +41,7 @@
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.utils.CodecUtils;
 
 @SuppressWarnings("rawtypes")
 public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
@@ -141,16 +140,14 @@
     additionalSpillBytesReadCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
     numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
     dataViaEventSize = outputContext.getCounters().findCounter(TaskCounter.DATA_BYTES_VIA_EVENT);
-    
+
     // compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, this.conf);
-    } else {
-      codec = null;
+    try {
+      this.codec = CodecUtils.getCodec(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
-    
+
     this.ifileReadAhead = this.conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 1db7869..c67c405 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -35,8 +35,6 @@
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -46,14 +44,13 @@
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.readers.UnorderedKVReader;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
 import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
 import org.apache.tez.runtime.library.common.shuffle.impl.SimpleFetchedInputAllocator;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.tez.common.Preconditions;
 
 /**
@@ -114,14 +111,7 @@
     if (!isStarted.get()) {
       ////// Initial configuration
       memoryUpdateCallbackHandler.validateUpdateReceived();
-      CompressionCodec codec;
-      if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-        Class<? extends CompressionCodec> codecClass = ConfigUtils
-            .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-        codec = ReflectionUtils.newInstance(codecClass, conf);
-      } else {
-        codec = null;
-      }
+      CompressionCodec codec = CodecUtils.getCodec(conf);
 
       boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
       boolean ifileReadAhead = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
new file mode 100644
index 0000000..99d22c5
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CodecUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
+  private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
+
+  private CodecUtils() {
+  }
+
+  public static CompressionCodec getCodec(Configuration conf) throws IOException {
+    if (ConfigUtils.shouldCompressIntermediateOutput(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getIntermediateOutputCompressorClass(conf, DefaultCodec.class);
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+      if (codec != null) {
+        Class<? extends Compressor> compressorType = null;
+        Throwable cause = null;
+        try {
+          compressorType = codec.getCompressorType();
+        } catch (RuntimeException e) {
+          cause = e;
+        }
+        if (compressorType == null) {
+          String errMsg = String.format(
+              "Unable to get CompressorType for codec (%s). This is most"
+                  + " likely due to missing native libraries for the codec.",
+              conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC));
+          throw new IOException(errMsg, cause);
+        }
+      }
+      return codec;
+    } else {
+      return null;
+    }
+  }
+
+  public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
+      IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
+      throws IOException {
+    String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
+    Configurable configurableCodec = (Configurable) codec;
+    int originalSize = configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);
+
+    CompressionInputStream in = null;
+
+    if (bufferSizeProp != null) {
+      Configuration conf = configurableCodec.getConf();
+      int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
+      LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
+          DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);
+
+      synchronized (codec) {
+        conf.setInt(bufferSizeProp, newBufSize);
+
+        in = codec.createInputStream(checksumIn, decompressor);
+        /*
+         * We would better reset the original buffer size into the codec. Basically the buffer size
+         * is used at 2 places.
+         *
+         * 1. It can tell the inputstream/outputstream buffersize (which is created by
+         * codec.createInputStream/codec.createOutputStream). This is something which might and
+         * should be optimized in config, as inputstreams instantiate and use their own buffer and
+         * won't reuse buffers from previous streams (TEZ-4135).
+         *
+         * 2. The same buffersize is used when a codec creates a new Compressor/Decompressor. The
+         * fundamental difference is that Compressor/Decompressor instances are expensive and reused
+         * by hadoop's CodecPool. Here is a hidden mismatch, which can happen when a codec is
+         * created with a small buffersize config. Once it creates a Compressor/Decompressor
+         * instance from its config field, the reused Compressor/Decompressor instance will be
+         * reused later, even when application handles large amount of data. This way we can end up
+         * in large stream buffers + small compressor/decompressor buffers, which can be suboptimal,
+         * moreover, it can lead to strange errors, when a compressed output exceeds the size of the
+         * buffer (TEZ-4234).
+         *
+         * An interesting outcome is that - as the codec buffersize config affects both
+         * compressor(output) and decompressor(input) paths - an altered codec config can cause the
+         * issues above for Compressor instances as well, even when we tried to leverage from
+         * smaller buffer size only on decompression paths.
+         */
+        configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
+      }
+    } else {
+      in = codec.createInputStream(checksumIn, decompressor);
+    }
+
+    return in;
+  }
+}
\ No newline at end of file
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 520dec7..446801a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -290,6 +290,7 @@
         .thenThrow(new InternalError(codecErrorMsg));
     Decompressor mockDecoder = mock(Decompressor.class);
     CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream);
@@ -312,6 +313,7 @@
         .thenThrow(new IllegalArgumentException(codecErrorMsg));
     Decompressor mockDecoder = mock(Decompressor.class);
     CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream);
@@ -327,7 +329,8 @@
     CompressionInputStream mockCodecStream1 = mock(CompressionInputStream.class);
     when(mockCodecStream1.read(any(byte[].class), anyInt(), anyInt()))
         .thenThrow(new SocketTimeoutException(codecErrorMsg));
-    CompressionCodec mockCodec1 = mock(CompressionCodec.class);
+    CompressionCodec mockCodec1 = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec1).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec1.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec1.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream1);
@@ -342,7 +345,8 @@
     CompressionInputStream mockCodecStream2 = mock(CompressionInputStream.class);
     when(mockCodecStream2.read(any(byte[].class), anyInt(), anyInt()))
         .thenThrow(new InternalError(codecErrorMsg));
-    CompressionCodec mockCodec2 = mock(CompressionCodec.class);
+    CompressionCodec mockCodec2 = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec2).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec2.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec2.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream2);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index c74496e..bf35955 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -50,9 +50,11 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -66,6 +68,7 @@
 import org.apache.tez.runtime.library.utils.BufferUtils;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -729,13 +732,16 @@
 
   @Test
   public void testInMemoryBufferSize() throws IOException {
+    Configurable configurableCodec = (Configurable) codec;
+    int originalCodecBufferSize =
+        configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
+
     // for smaller amount of data, codec buffer should be sized according to compressed data length
     List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
     Writer writer = writeTestFile(false, false, data, codec);
     readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
 
-    Configurable configurableCodec = (Configurable) codec;
-    Assert.assertEquals(writer.getCompressedLength(),
+    Assert.assertEquals(originalCodecBufferSize, // original size is repaired
         configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
 
     // buffer size cannot grow infinitely with compressed data size
@@ -743,10 +749,57 @@
     writer = writeTestFile(false, false, data, codec);
     readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
 
-    Assert.assertEquals(128*1024,
+    Assert.assertEquals(originalCodecBufferSize, // original size is repaired
         configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testSmallDataCompression() throws IOException {
+    Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+
+    tryWriteFileWithBufferSize(17, "org.apache.hadoop.io.compress.Lz4Codec");
+    tryWriteFileWithBufferSize(32, "org.apache.hadoop.io.compress.Lz4Codec");
+  }
+
+  private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName)
+      throws IOException {
+    Configuration conf = new Configuration();
+
+    System.out.println("trying with buffer size: " + bufferSize);
+    conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
+    CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
+    CompressionCodec codecToTest =
+        codecFactory.getCodecByClassName(codecClassName);
+    List<KVPair> data = KVDataGen.generateTestDataOfKeySize(false, 1, 0);
+    writeTestFile(false, false, data, codecToTest);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testLz4CompressedDataIsLargerThanOriginal() throws IOException {
+    Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+
+    // this one succeeds
+    byte[] buf = new byte[32];
+    initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48);
+    Lz4Compressor comp = new Lz4Compressor(32, false);
+    comp.setInput(buf, 0, 32);
+    comp.compress(buf, 0, 32);
+
+    // adding 1 more element makes that fail
+    buf = new byte[32];
+    initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48,
+        50);
+    comp = new Lz4Compressor(32, false);
+    comp.setInput(buf, 0, 32);
+    comp.compress(buf, 0, 32);
+  }
+
+  private void initBufWithNumbers(byte[] buf, int... args) {
+    for (int i = 0; i < args.length; i++) {
+      buf[i] = (byte) args[i];
+    }
+  }
+
   /**
    * Test different options (RLE, repeat keys, compression) on reader/writer
    *