blob: 02f6e7f685d0cc9dc996e8c79097194ccc5987e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.io.blockcompression;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;
import sun.misc.Cleaner;
import sun.nio.ch.DirectBuffer;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.flink.api.common.io.blockcompression.BlockCompressionFactoryLoader.CompressionMethod;
import static org.apache.flink.api.common.io.blockcompression.BlockCompressionFactoryLoader.CompressionMethod.BZIP2;
import static org.apache.flink.api.common.io.blockcompression.BlockCompressionFactoryLoader.CompressionMethod.GZIP;
import static org.apache.flink.api.common.io.blockcompression.BlockCompressionFactoryLoader.CompressionMethod.LZ4;
import static org.junit.Assert.assertEquals;
public class BlockCompressionTest {
@Test
public void testLz4() throws IOException {
runArrayTest(LZ4, 32768);
runArrayTest(LZ4, 16);
runByteBufferTest(LZ4, false, 32768);
runByteBufferTest(LZ4, false, 16);
runByteBufferTest(LZ4, true, 32768);
runByteBufferTest(LZ4, true, 16);
}
@Test
public void testBzip2() throws IOException {
runArrayTest(BZIP2, 32768);
runArrayTest(BZIP2, 16);
runByteBufferTest(BZIP2, false, 32768);
runByteBufferTest(BZIP2, false, 16);
runByteBufferTest(BZIP2, true, 32768);
runByteBufferTest(BZIP2, true, 16);
}
@Test
public void testGzip() throws IOException {
runArrayTest(GZIP, 32768);
runArrayTest(GZIP, 16);
runByteBufferTest(GZIP, false, 32768);
runByteBufferTest(GZIP, false, 16);
runByteBufferTest(GZIP, true, 32768);
runByteBufferTest(GZIP, true, 16);
}
private void runArrayTest(CompressionMethod method, int originalLen) throws IOException {
BlockCompressionFactory blockCompressionFactory = BlockCompressionFactoryLoader.createBlockCompressionFactory(
method.name(), new Configuration());
AbstractBlockCompressor compressor = blockCompressionFactory.getCompressor();
AbstractBlockDecompressor decompressor = blockCompressionFactory.getDecompressor();
int originalOff = 64;
byte[] data = new byte[originalOff + originalLen];
for (int i = 0; i < originalLen; i++) {
data[originalOff + i] = (byte) i;
}
int compressedOff = 32;
byte[] compressedData = new byte[compressedOff + compressor.getMaxCompressedSize(originalLen)];
int compressedLen = compressor.compress(data, originalOff, originalLen, compressedData, compressedOff);
int decompressedOff = 16;
byte[] decompressedData = new byte[decompressedOff + originalLen];
decompressor.decompress(compressedData, compressedOff, compressedLen, decompressedData, decompressedOff);
for (int i = 0; i < originalLen; i++) {
assertEquals(data[originalOff + i], decompressedData[decompressedOff + i]);
}
}
private void runByteBufferTest(
BlockCompressionFactoryLoader.CompressionMethod method,
boolean isDirect, int originalLen) throws IOException {
BlockCompressionFactory blockCompressionFactory = BlockCompressionFactoryLoader.createBlockCompressionFactory(
method.name(), new Configuration());
AbstractBlockCompressor compressor = blockCompressionFactory.getCompressor();
AbstractBlockDecompressor decompressor = blockCompressionFactory.getDecompressor();
int originalOff = 64;
ByteBuffer data;
if (isDirect) {
data = ByteBuffer.allocateDirect(originalOff + originalLen);
} else {
data = ByteBuffer.allocate(originalOff + originalLen);
}
// Useless data
for (int i = 0; i < originalOff; i++) {
data.put((byte) 0x5a);
}
for (int i = 0; i < originalLen; i++) {
data.put((byte) i);
}
data.flip();
ByteBuffer compressedData;
int maxCompressedLen = compressor.getMaxCompressedSize(originalLen);
if (isDirect) {
compressedData = ByteBuffer.allocateDirect(maxCompressedLen);
} else {
compressedData = ByteBuffer.allocate(maxCompressedLen);
}
int compressedLen = compressor.compress(data, originalOff, originalLen, compressedData, 0);
assertEquals(compressedLen, compressedData.position());
compressedData.flip();
int compressedOff = 32;
ByteBuffer copiedCompressedData;
if (isDirect) {
copiedCompressedData = ByteBuffer.allocateDirect(compressedOff + compressedLen);
} else {
copiedCompressedData = ByteBuffer.allocate(compressedOff + compressedLen);
}
// Useless data
for (int i = 0; i < compressedOff; i++) {
copiedCompressedData.put((byte) 0x5a);
}
byte[] compressedByteArray = new byte[compressedLen];
compressedData.get(compressedByteArray, 0, compressedLen);
copiedCompressedData.put(compressedByteArray);
copiedCompressedData.flip();
ByteBuffer decompressedData;
if (isDirect) {
decompressedData = ByteBuffer.allocateDirect(originalLen);
} else {
decompressedData = ByteBuffer.allocate(originalLen);
}
int decompressedLen = decompressor.decompress(
copiedCompressedData, compressedOff, compressedLen, decompressedData, 0);
assertEquals(decompressedLen, decompressedData.position());
decompressedData.flip();
for (int i = 0; i < decompressedLen; i++) {
assertEquals((byte) i, decompressedData.get());
}
if (isDirect) {
cleanDirectBuffer(data);
cleanDirectBuffer(compressedData);
cleanDirectBuffer(copiedCompressedData);
cleanDirectBuffer(decompressedData);
}
}
private void cleanDirectBuffer(ByteBuffer buffer) {
Cleaner cleaner = ((DirectBuffer) buffer).cleaner();
if (cleaner != null) {
cleaner.clean();
}
}
}