| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.arrow.compression; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import org.apache.arrow.memory.ArrowBuf; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.util.Preconditions; |
| import org.apache.arrow.vector.compression.AbstractCompressionCodec; |
| import org.apache.arrow.vector.compression.CompressionUtil; |
| import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; |
| import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; |
| import org.apache.commons.compress.utils.IOUtils; |
| |
| /** Compression codec for the LZ4 algorithm. */ |
| public class Lz4CompressionCodec extends AbstractCompressionCodec { |
| |
| @Override |
| protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { |
| Preconditions.checkArgument( |
| uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, |
| "The uncompressed buffer size exceeds the integer limit %s.", |
| Integer.MAX_VALUE); |
| |
| byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; |
| uncompressedBuffer.getBytes(/* index= */ 0, inBytes); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try (InputStream in = new ByteArrayInputStream(inBytes); |
| OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { |
| IOUtils.copy(in, out); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| byte[] outBytes = baos.toByteArray(); |
| |
| ArrowBuf compressedBuffer = |
| allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); |
| compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes); |
| compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); |
| return compressedBuffer; |
| } |
| |
| @Override |
| protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { |
| Preconditions.checkArgument( |
| compressedBuffer.writerIndex() <= Integer.MAX_VALUE, |
| "The compressed buffer size exceeds the integer limit %s", |
| Integer.MAX_VALUE); |
| |
| long decompressedLength = readUncompressedLength(compressedBuffer); |
| |
| byte[] inBytes = |
| new byte |
| [(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; |
| compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes); |
| ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); |
| try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { |
| IOUtils.copy(in, out); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| byte[] outBytes = out.toByteArray(); |
| ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); |
| decompressedBuffer.setBytes(/* index= */ 0, outBytes); |
| decompressedBuffer.writerIndex(decompressedLength); |
| return decompressedBuffer; |
| } |
| |
| @Override |
| public CompressionUtil.CodecType getCodecType() { |
| return CompressionUtil.CodecType.LZ4_FRAME; |
| } |
| } |