blob: 1059af4380818b6110922635942e247554aa4572 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.compression;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.LinkedList;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.airlift.compress.Compressor;
import io.airlift.compress.Decompressor;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.snappy.SnappyCompressor;
import io.airlift.compress.snappy.SnappyDecompressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
/**
* A shim making an aircompressor (de)compressor available through the BytesInputCompressor
* and BytesInputDecompressor interfaces.
*/
public class AirliftBytesInputCompressor implements CompressionCodecFactory.BytesInputCompressor, CompressionCodecFactory.BytesInputDecompressor {
private static final Logger logger = LoggerFactory.getLogger(AirliftBytesInputCompressor.class);
// name of the codec provided by this compressor
private CompressionCodecName codecName;
// backing aircompressor compressor
private Compressor airComp;
// backing aircompressor decompressor
private Decompressor airDecomp;
// the direct memory allocator to be used for (de)compression outputs
private ByteBufferAllocator allocator;
// all the direct memory buffers we've allocated, and must release
private Deque<ByteBuffer> allocatedBuffers;
public AirliftBytesInputCompressor(CompressionCodecName codecName, ByteBufferAllocator allocator) {
this.codecName = codecName;
switch (codecName) {
case LZ4:
airComp = new Lz4Compressor();
airDecomp = new Lz4Decompressor();
break;
case LZO:
airComp = new LzoCompressor();
airDecomp = new LzoDecompressor();
break;
case SNAPPY:
airComp = new SnappyCompressor();
airDecomp = new SnappyDecompressor();
break;
case ZSTD:
airComp = new ZstdCompressor();
airDecomp = new ZstdDecompressor();
break;
default:
throw new UnsupportedOperationException("Parquet compression codec is not supported: " + codecName);
}
this.allocator = allocator;
this.allocatedBuffers = new LinkedList<>();
logger.debug(
"constructed a {} using a backing compressor of {}",
getClass().getName(),
airComp.getClass().getName()
);
}
@Override
public BytesInput compress(BytesInput bytes) throws IOException {
ByteBuffer inBuf = bytes.toByteBuffer();
logger.trace(
"will use aircompressor to compress {} bytes from a {} containing a {}",
bytes.size(),
bytes.getClass().getName(),
inBuf.getClass().getName()
);
// aircompressor tells us the maximum amount of output buffer we could need
int maxOutLen = airComp.maxCompressedLength((int) bytes.size());
ByteBuffer outBuf = allocator.allocate(maxOutLen);
// track our allocation for later release in release()
this.allocatedBuffers.push(outBuf);
airComp.compress(inBuf, outBuf);
// flip: callers expect the output buffer positioned at the start of data
return BytesInput.from((ByteBuffer) outBuf.flip());
}
@Override
public CompressionCodecName getCodecName() {
return codecName;
}
@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
ByteBuffer inBuf = bytes.toByteBuffer();
logger.trace(
"will use aircompressor to decompress {} bytes from a {} containing a {}.",
uncompressedSize,
bytes.getClass().getName(),
inBuf.getClass().getName()
);
ByteBuffer outBuf = allocator.allocate(uncompressedSize);
// track our allocation for later release in release()
this.allocatedBuffers.push(outBuf);
airDecomp.decompress(inBuf, outBuf);
// flip: callers expect the output buffer positioned at the start of data
return BytesInput.from((ByteBuffer) outBuf.flip());
}
@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
throws IOException {
logger.trace(
"will use aircompressor to decompress {} bytes from a {} to a {}.",
uncompressedSize,
input.getClass().getName(),
output.getClass().getName()
);
airDecomp.decompress(input, output);
}
@Override
public void release() {
int bufCount = allocatedBuffers.size();
// LIFO release order to try to reduce memory fragmentation.
int i = 0;
while (!allocatedBuffers.isEmpty()) {
allocator.release(allocatedBuffers.pop());
i++;
}
assert bufCount == i;
logger.debug("released {} allocated buffers", bufCount);
}
}