| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.parquet.hadoop; |
| |
| |
| |
| import java.lang.reflect.Method; |
| import java.lang.reflect.InvocationTargetException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.apache.commons.pool.BasePoolableObjectFactory; |
| import org.apache.commons.pool.impl.GenericObjectPool; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.compress.Compressor; |
| import org.apache.hadoop.io.compress.Decompressor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.xerial.snappy.Snappy; |
| |
| import org.apache.parquet.bytes.ByteBufferAllocator; |
| import org.apache.parquet.bytes.BytesInput; |
| import org.apache.parquet.hadoop.metadata.CompressionCodecName; |
| import org.apache.parquet.ParquetRuntimeException; |
| import org.apache.parquet.Preconditions; |
| |
| /** |
| * Factory to produce compressors and decompressors that operate on java |
| * direct memory, without requiring a copy into heap memory (where possible). |
| */ |
| class DirectCodecFactory extends CodecFactory implements AutoCloseable { |
| private static final Logger LOG = LoggerFactory.getLogger(DirectCodecFactory.class); |
| |
| private final ByteBufferAllocator allocator; |
| |
| // Any of these can be null depending on the version of hadoop on the classpath |
| private static final Class<?> DIRECT_DECOMPRESSION_CODEC_CLASS; |
| private static final Method DECOMPRESS_METHOD; |
| private static final Method CREATE_DIRECT_DECOMPRESSOR_METHOD; |
| |
| static { |
| Class<?> tempClass = null; |
| Method tempCreateMethod = null; |
| Method tempDecompressMethod = null; |
| try { |
| tempClass = Class.forName("org.apache.hadoop.io.compress.DirectDecompressionCodec"); |
| tempCreateMethod = tempClass.getMethod("createDirectDecompressor"); |
| tempDecompressMethod = tempClass.getMethod("decompress", ByteBuffer.class, ByteBuffer.class); |
| } catch (ClassNotFoundException | NoSuchMethodException e) { |
| // do nothing, the class will just be assigned null |
| } |
| DIRECT_DECOMPRESSION_CODEC_CLASS = tempClass; |
| CREATE_DIRECT_DECOMPRESSOR_METHOD = tempCreateMethod; |
| DECOMPRESS_METHOD = tempDecompressMethod; |
| } |
| |
| /** |
| * See docs on CodecFactory#createDirectCodecFactory which is how this class is |
| * exposed publicly and is just a pass-through factory method for this constructor |
| * to hide the rest of this class from public access. |
| */ |
| DirectCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) { |
| super(config, pageSize); |
| Preconditions.checkNotNull(allocator, "allocator"); |
| Preconditions.checkState(allocator.isDirect(), |
| "A %s requires a direct buffer allocator be provided.", |
| getClass().getSimpleName()); |
| this.allocator = allocator; |
| } |
| |
| private ByteBuffer ensure(ByteBuffer buffer, int size) { |
| if (buffer == null) { |
| buffer = allocator.allocate(size); |
| } else if (buffer.capacity() >= size) { |
| buffer.clear(); |
| } else { |
| release(buffer); |
| buffer = allocator.allocate(size); |
| } |
| return buffer; |
| } |
| |
| ByteBuffer release(ByteBuffer buffer) { |
| if (buffer != null) { |
| allocator.release(buffer); |
| } |
| return null; |
| } |
| |
| @Override |
| protected BytesCompressor createCompressor(final CompressionCodecName codecName) { |
| |
| CompressionCodec codec = getCodec(codecName); |
| if (codec == null) { |
| return new NoopCompressor(); |
| } else if (codecName == CompressionCodecName.SNAPPY) { |
| // avoid using the default Snappy codec since it allocates direct buffers at awkward spots. |
| return new SnappyCompressor(); |
| } else { |
| // todo: create class similar to the SnappyCompressor for zlib and exclude it as |
| // snappy is above since it also generates allocateDirect calls. |
| return new HeapBytesCompressor(codecName); |
| } |
| } |
| |
| @Override |
| protected BytesDecompressor createDecompressor(final CompressionCodecName codecName) { |
| CompressionCodec codec = getCodec(codecName); |
| if (codec == null) { |
| return new NoopDecompressor(); |
| } else if (codecName == CompressionCodecName.SNAPPY ) { |
| return new SnappyDecompressor(); |
| } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) { |
| return new FullDirectDecompressor(codecName); |
| } else { |
| return new IndirectDecompressor(codec); |
| } |
| } |
| |
| public void close() { |
| release(); |
| } |
| |
| /** |
| * Wrapper around legacy hadoop compressors that do not implement a direct memory |
| * based version of the decompression algorithm. |
| */ |
| public class IndirectDecompressor extends BytesDecompressor { |
| private final Decompressor decompressor; |
| |
| public IndirectDecompressor(CompressionCodec codec) { |
| this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor(); |
| } |
| |
| @Override |
| public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { |
| decompressor.reset(); |
| byte[] inputBytes = bytes.toByteArray(); |
| decompressor.setInput(inputBytes, 0, inputBytes.length); |
| byte[] output = new byte[uncompressedSize]; |
| decompressor.decompress(output, 0, uncompressedSize); |
| return BytesInput.from(output); |
| } |
| |
| @Override |
| public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) |
| throws IOException { |
| |
| decompressor.reset(); |
| byte[] inputBytes = new byte[compressedSize]; |
| input.position(0); |
| input.get(inputBytes); |
| decompressor.setInput(inputBytes, 0, inputBytes.length); |
| byte[] outputBytes = new byte[uncompressedSize]; |
| decompressor.decompress(outputBytes, 0, uncompressedSize); |
| output.clear(); |
| output.put(outputBytes); |
| } |
| |
| @Override |
| public void release() { |
| DirectCodecPool.INSTANCE.returnDecompressor(decompressor); |
| } |
| } |
| |
| /** |
| * Wrapper around new Hadoop compressors that implement a direct memory |
| * based version of a particular decompression algorithm. To maintain |
| * compatibility with Hadoop 1.x these classes that implement |
| * {@link org.apache.hadoop.io.compress.DirectDecompressionCodec} |
| * are currently retrieved and have their decompression method invoked |
| * with reflection. |
| */ |
| public class FullDirectDecompressor extends BytesDecompressor { |
| private final Object decompressor; |
| private HeapBytesDecompressor extraDecompressor; |
| public FullDirectDecompressor(CompressionCodecName codecName){ |
| CompressionCodec codec = getCodec(codecName); |
| this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor(); |
| this.extraDecompressor = new HeapBytesDecompressor(codecName); |
| } |
| |
| @Override |
| public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException { |
| return extraDecompressor.decompress(compressedBytes, uncompressedSize); |
| } |
| |
| @Override |
| public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) |
| throws IOException { |
| output.clear(); |
| try { |
| DECOMPRESS_METHOD.invoke(decompressor, (ByteBuffer) input.limit(compressedSize), (ByteBuffer) output.limit(uncompressedSize)); |
| } catch (IllegalAccessException | InvocationTargetException e) { |
| throw new DirectCodecPool.ParquetCompressionCodecException(e); |
| } |
| output.position(uncompressedSize); |
| } |
| |
| @Override |
| public void release() { |
| DirectCodecPool.INSTANCE.returnDirectDecompressor(decompressor); |
| extraDecompressor.release(); |
| } |
| |
| } |
| |
| public class NoopDecompressor extends BytesDecompressor { |
| |
| @Override |
| public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) |
| throws IOException { |
| Preconditions.checkArgument(compressedSize == uncompressedSize, |
| "Non-compressed data did not have matching compressed and uncompressed sizes."); |
| output.clear(); |
| output.put((ByteBuffer) input.duplicate().position(0).limit(compressedSize)); |
| } |
| |
| @Override |
| public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { |
| return bytes; |
| } |
| |
| @Override |
| public void release() {} |
| |
| } |
| |
| public class SnappyDecompressor extends BytesDecompressor { |
| |
| private HeapBytesDecompressor extraDecompressor; |
| public SnappyDecompressor() { |
| this.extraDecompressor = new HeapBytesDecompressor(CompressionCodecName.SNAPPY); |
| } |
| |
| @Override |
| public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { |
| return extraDecompressor.decompress(bytes, uncompressedSize); |
| } |
| |
| @Override |
| public void decompress(ByteBuffer src, int compressedSize, ByteBuffer dst, int uncompressedSize) throws IOException { |
| dst.clear(); |
| int size = Snappy.uncompress(src, dst); |
| dst.limit(size); |
| } |
| |
| @Override |
| public void release() {} |
| } |
| |
| public class SnappyCompressor extends BytesCompressor { |
| |
| // TODO - this outgoing buffer might be better off not being shared, this seems to |
| // only work because of an extra copy currently happening where this interface is |
| // be consumed |
| private ByteBuffer incoming; |
| private ByteBuffer outgoing; |
| |
| /** |
| * Compress a given buffer of bytes |
| * @param bytes |
| * @return |
| * @throws IOException |
| */ |
| @Override |
| public BytesInput compress(BytesInput bytes) throws IOException { |
| int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size()); |
| ByteBuffer bufferIn = bytes.toByteBuffer(); |
| outgoing = ensure(outgoing, maxOutputSize); |
| final int size; |
| if (bufferIn.isDirect()) { |
| size = Snappy.compress(bufferIn, outgoing); |
| } else { |
| // Snappy library requires buffers be direct |
| this.incoming = ensure(this.incoming, (int) bytes.size()); |
| this.incoming.put(bufferIn); |
| this.incoming.flip(); |
| size = Snappy.compress(this.incoming, outgoing); |
| } |
| |
| outgoing.limit(size); |
| |
| return BytesInput.from(outgoing); |
| } |
| |
| @Override |
| public CompressionCodecName getCodecName() { |
| return CompressionCodecName.SNAPPY; |
| } |
| |
| @Override |
| public void release() { |
| outgoing = DirectCodecFactory.this.release(outgoing); |
| incoming = DirectCodecFactory.this.release(incoming); |
| } |
| |
| } |
| |
| public static class NoopCompressor extends BytesCompressor { |
| |
| public NoopCompressor() {} |
| |
| @Override |
| public BytesInput compress(BytesInput bytes) throws IOException { |
| return bytes; |
| } |
| |
| @Override |
| public CompressionCodecName getCodecName() { |
| return CompressionCodecName.UNCOMPRESSED; |
| } |
| |
| @Override |
| public void release() {} |
| } |
| |
| static class DirectCodecPool { |
| |
| public static final DirectCodecPool INSTANCE = new DirectCodecPool(); |
| |
| private final Map<CompressionCodec, CodecPool> codecs = |
| Collections.synchronizedMap(new HashMap<CompressionCodec, CodecPool>()); |
| private final Map<Class<?>, GenericObjectPool> directDePools = Collections |
| .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>()); |
| private final Map<Class<?>, GenericObjectPool> dePools = Collections |
| .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>()); |
| private final Map<Class<?>, GenericObjectPool> cPools = Collections |
| .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>()); |
| |
| private DirectCodecPool() {} |
| |
| public class CodecPool { |
| private final GenericObjectPool compressorPool; |
| private final GenericObjectPool decompressorPool; |
| private final GenericObjectPool directDecompressorPool; |
| private final boolean supportDirectDecompressor; |
| private static final String BYTE_BUF_IMPL_NOT_FOUND_MSG = |
| "Unable to find ByteBuffer based %s for codec %s, will use a byte array based implementation instead."; |
| |
| private CodecPool(final CompressionCodec codec){ |
| try { |
| boolean supportDirectDecompressor = codec.getClass() == DIRECT_DECOMPRESSION_CODEC_CLASS; |
| compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { |
| public Object makeObject() throws Exception { |
| return codec.createCompressor(); |
| } |
| }, Integer.MAX_VALUE); |
| |
| Object com = compressorPool.borrowObject(); |
| if (com != null) { |
| cPools.put(com.getClass(), compressorPool); |
| compressorPool.returnObject(com); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "compressor", codec.getClass().getName())); |
| } |
| } |
| |
| decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { |
| public Object makeObject() throws Exception { |
| return codec.createDecompressor(); |
| } |
| }, Integer.MAX_VALUE); |
| |
| Object decom = decompressorPool.borrowObject(); |
| if (decom != null) { |
| dePools.put(decom.getClass(), decompressorPool); |
| decompressorPool.returnObject(decom); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "decompressor", codec.getClass().getName())); |
| } |
| } |
| |
| if (supportDirectDecompressor) { |
| directDecompressorPool = new GenericObjectPool( |
| new BasePoolableObjectFactory() { |
| public Object makeObject() throws Exception { |
| return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(DIRECT_DECOMPRESSION_CODEC_CLASS); |
| } |
| }, Integer.MAX_VALUE); |
| |
| Object ddecom = directDecompressorPool.borrowObject(); |
| if (ddecom != null) { |
| directDePools.put(ddecom.getClass(), directDecompressorPool); |
| directDecompressorPool.returnObject(ddecom); |
| |
| } else { |
| supportDirectDecompressor = false; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "compressor", codec.getClass().getName())); |
| } |
| } |
| |
| } else { |
| directDecompressorPool = null; |
| } |
| |
| this.supportDirectDecompressor = supportDirectDecompressor; |
| } catch (Exception e) { |
| throw new ParquetCompressionCodecException("Error creating compression codec pool.", e); |
| } |
| } |
| |
| public Object borrowDirectDecompressor(){ |
| Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec."); |
| try { |
| return directDecompressorPool.borrowObject(); |
| } catch (Exception e) { |
| throw new ParquetCompressionCodecException(e); |
| } |
| } |
| |
| public boolean supportsDirectDecompression() { |
| return supportDirectDecompressor; |
| } |
| |
| public Decompressor borrowDecompressor(){ |
| return borrow(decompressorPool); |
| } |
| |
| public Compressor borrowCompressor(){ |
| return borrow(compressorPool); |
| } |
| } |
| |
| public CodecPool codec(CompressionCodec codec){ |
| CodecPool pools = codecs.get(codec); |
| if(pools == null){ |
| synchronized(this){ |
| pools = codecs.get(codec); |
| if(pools == null){ |
| pools = new CodecPool(codec); |
| codecs.put(codec, pools); |
| } |
| } |
| } |
| return pools; |
| } |
| |
| private void returnToPool(Object obj, Map<Class<?>, GenericObjectPool> pools) { |
| try { |
| GenericObjectPool pool = pools.get(obj.getClass()); |
| if (pool == null) { |
| throw new IllegalStateException("Received unexpected compressor or decompressor, " + |
| "cannot be returned to any available pool: " + obj.getClass().getSimpleName()); |
| } |
| pool.returnObject(obj); |
| } catch (Exception e) { |
| throw new ParquetCompressionCodecException(e); |
| } |
| } |
| |
| /** |
| * Borrow an object from a pool. |
| * |
| * @param pool - the pull to borrow from, must not be null |
| * @return - an object from the pool |
| */ |
| @SuppressWarnings("unchecked") |
| public <T> T borrow(GenericObjectPool pool) { |
| try { |
| return (T) pool.borrowObject(); |
| } catch (Exception e) { |
| throw new ParquetCompressionCodecException(e); |
| } |
| |
| } |
| |
| public void returnCompressor(Compressor compressor) { |
| returnToPool(compressor, cPools); |
| } |
| |
| public void returnDecompressor(Decompressor decompressor) { |
| returnToPool(decompressor, dePools); |
| } |
| |
| public void returnDirectDecompressor(Object decompressor) { |
| returnToPool(decompressor, directDePools); |
| } |
| |
| public static class ParquetCompressionCodecException extends ParquetRuntimeException { |
| |
| public ParquetCompressionCodecException() { |
| super(); |
| } |
| |
| public ParquetCompressionCodecException(String message, Throwable cause) { |
| super(message, cause); |
| } |
| |
| public ParquetCompressionCodecException(String message) { |
| super(message); |
| } |
| |
| public ParquetCompressionCodecException(Throwable cause) { |
| super(cause); |
| } |
| } |
| } |
| } |