blob: cc986c7e0aea4466e195aea6676b556fe6e1f415 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.snappy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assume.*;
public class TestSnappyCompressorDecompressor {
@Before
public void before() {
assumeTrue(SnappyCodec.isNativeCodeLoaded());
}
@Test
public void testSnappyCompressorSetInputNullPointerException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
compressor.setInput(null, 0, 10);
fail("testSnappyCompressorSetInputNullPointerException error !!!");
} catch (NullPointerException ex) {
// excepted
} catch (Exception ex) {
fail("testSnappyCompressorSetInputNullPointerException ex error !!!");
}
}
@Test
public void testSnappyDecompressorSetInputNullPointerException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
decompressor.setInput(null, 0, 10);
fail("testSnappyDecompressorSetInputNullPointerException error !!!");
} catch (NullPointerException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorSetInputNullPointerException ex error !!!");
}
}
@Test
public void testSnappyCompressorSetInputAIOBException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
compressor.setInput(new byte[] {}, -5, 10);
fail("testSnappyCompressorSetInputAIOBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception ex) {
fail("testSnappyCompressorSetInputAIOBException ex error !!!");
}
}
@Test
public void testSnappyDecompressorSetInputAIOUBException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
decompressor.setInput(new byte[] {}, -5, 10);
fail("testSnappyDecompressorSetInputAIOUBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorSetInputAIOUBException ex error !!!");
}
}
@Test
public void testSnappyCompressorCompressNullPointerException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(null, 0, 0);
fail("testSnappyCompressorCompressNullPointerException error !!!");
} catch (NullPointerException ex) {
// expected
} catch (Exception e) {
fail("testSnappyCompressorCompressNullPointerException ex error !!!");
}
}
@Test
public void testSnappyDecompressorCompressNullPointerException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(null, 0, 0);
fail("testSnappyDecompressorCompressNullPointerException error !!!");
} catch (NullPointerException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorCompressNullPointerException ex error !!!");
}
}
@Test
public void testSnappyCompressorCompressAIOBException() {
try {
SnappyCompressor compressor = new SnappyCompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(new byte[] {}, 0, -1);
fail("testSnappyCompressorCompressAIOBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception e) {
fail("testSnappyCompressorCompressAIOBException ex error !!!");
}
}
@Test
public void testSnappyDecompressorCompressAIOBException() {
try {
SnappyDecompressor decompressor = new SnappyDecompressor();
byte[] bytes = BytesGenerator.get(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(new byte[] {}, 0, -1);
fail("testSnappyDecompressorCompressAIOBException error !!!");
} catch (ArrayIndexOutOfBoundsException ex) {
// expected
} catch (Exception e) {
fail("testSnappyDecompressorCompressAIOBException ex error !!!");
}
}
@Test
public void testSnappyCompressDecompress() {
int BYTE_SIZE = 1024 * 54;
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
SnappyCompressor compressor = new SnappyCompressor();
try {
compressor.setInput(bytes, 0, bytes.length);
assertTrue("SnappyCompressDecompress getBytesRead error !!!",
compressor.getBytesRead() > 0);
assertTrue(
"SnappyCompressDecompress getBytesWritten before compress error !!!",
compressor.getBytesWritten() == 0);
byte[] compressed = new byte[BYTE_SIZE];
int cSize = compressor.compress(compressed, 0, compressed.length);
assertTrue(
"SnappyCompressDecompress getBytesWritten after compress error !!!",
compressor.getBytesWritten() > 0);
SnappyDecompressor decompressor = new SnappyDecompressor(BYTE_SIZE);
// set as input for decompressor only compressed data indicated with cSize
decompressor.setInput(compressed, 0, cSize);
byte[] decompressed = new byte[BYTE_SIZE];
decompressor.decompress(decompressed, 0, decompressed.length);
assertTrue("testSnappyCompressDecompress finished error !!!",
decompressor.finished());
Assert.assertArrayEquals(bytes, decompressed);
compressor.reset();
decompressor.reset();
assertTrue("decompressor getRemaining error !!!",
decompressor.getRemaining() == 0);
} catch (Exception e) {
fail("testSnappyCompressDecompress ex error!!!");
}
}
@Test
public void testCompressorDecompressorEmptyStreamLogic() {
ByteArrayInputStream bytesIn = null;
ByteArrayOutputStream bytesOut = null;
byte[] buf = null;
BlockDecompressorStream blockDecompressorStream = null;
try {
// compress empty stream
bytesOut = new ByteArrayOutputStream();
BlockCompressorStream blockCompressorStream = new BlockCompressorStream(
bytesOut, new SnappyCompressor(), 1024, 0);
// close without write
blockCompressorStream.close();
// check compressed output
buf = bytesOut.toByteArray();
assertEquals("empty stream compressed output size != 4", 4, buf.length);
// use compressed output as input for decompression
bytesIn = new ByteArrayInputStream(buf);
// create decompression stream
blockDecompressorStream = new BlockDecompressorStream(bytesIn,
new SnappyDecompressor(), 1024);
// no byte is available because stream was closed
assertEquals("return value is not -1", -1, blockDecompressorStream.read());
} catch (Exception e) {
fail("testCompressorDecompressorEmptyStreamLogic ex error !!!"
+ e.getMessage());
} finally {
if (blockDecompressorStream != null)
try {
bytesIn.close();
bytesOut.close();
blockDecompressorStream.close();
} catch (IOException e) {
}
}
}
@Test
public void testSnappyBlockCompression() {
int BYTE_SIZE = 1024 * 50;
int BLOCK_SIZE = 512;
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] block = new byte[BLOCK_SIZE];
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
try {
// Use default of 512 as bufferSize and compressionOverhead of
// (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm).
SnappyCompressor compressor = new SnappyCompressor();
int off = 0;
int len = BYTE_SIZE;
int maxSize = BLOCK_SIZE - 18;
if (BYTE_SIZE > maxSize) {
do {
int bufLen = Math.min(len, maxSize);
compressor.setInput(bytes, off, bufLen);
compressor.finish();
while (!compressor.finished()) {
compressor.compress(block, 0, block.length);
out.write(block);
}
compressor.reset();
off += bufLen;
len -= bufLen;
} while (len > 0);
}
assertTrue("testSnappyBlockCompression error !!!",
out.toByteArray().length > 0);
} catch (Exception ex) {
fail("testSnappyBlockCompression ex error !!!");
}
}
private void compressDecompressLoop(int rawDataSize) throws IOException {
byte[] rawData = BytesGenerator.get(rawDataSize);
byte[] compressedResult = new byte[rawDataSize+20];
int directBufferSize = Math.max(rawDataSize*2, 64*1024);
SnappyCompressor compressor = new SnappyCompressor(directBufferSize);
compressor.setInput(rawData, 0, rawDataSize);
int compressedSize = compressor.compress(compressedResult, 0, compressedResult.length);
SnappyDirectDecompressor decompressor = new SnappyDirectDecompressor();
ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
inBuf.put(compressedResult, 0, compressedSize);
inBuf.flip();
ByteBuffer expected = ByteBuffer.wrap(rawData);
outBuf.clear();
while(!decompressor.finished()) {
decompressor.decompress(inBuf, outBuf);
if (outBuf.remaining() == 0) {
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
}
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
assertEquals(0, expected.remaining());
}
@Test
public void testSnappyDirectBlockCompression() {
int[] size = { 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
assumeTrue(SnappyCodec.isNativeCodeLoaded());
try {
for (int i = 0; i < size.length; i++) {
compressDecompressLoop(size[i]);
}
} catch (IOException ex) {
fail("testSnappyDirectBlockCompression ex !!!" + ex);
}
}
@Test
public void testSnappyCompressorDecopressorLogicWithCompressionStreams() {
int BYTE_SIZE = 1024 * 100;
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
int bufferSize = 262144;
int compressionOverhead = (bufferSize / 6) + 32;
DataOutputStream deflateOut = null;
DataInputStream inflateIn = null;
try {
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
CompressionOutputStream deflateFilter = new BlockCompressorStream(
compressedDataBuffer, new SnappyCompressor(bufferSize), bufferSize,
compressionOverhead);
deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
deflateOut.write(bytes, 0, bytes.length);
deflateOut.flush();
deflateFilter.finish();
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
CompressionInputStream inflateFilter = new BlockDecompressorStream(
deCompressedDataBuffer, new SnappyDecompressor(bufferSize),
bufferSize);
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
byte[] result = new byte[BYTE_SIZE];
inflateIn.read(result);
Assert.assertArrayEquals(
"original array not equals compress/decompressed array", result,
bytes);
} catch (IOException e) {
fail("testSnappyCompressorDecopressorLogicWithCompressionStreams ex error !!!");
} finally {
try {
if (deflateOut != null)
deflateOut.close();
if (inflateIn != null)
inflateIn.close();
} catch (Exception e) {
}
}
}
static final class BytesGenerator {
private BytesGenerator() {
}
private static final byte[] CACHE = new byte[] { 0x0, 0x1, 0x2, 0x3, 0x4,
0x5, 0x6, 0x7, 0x8, 0x9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF };
private static final Random rnd = new Random(12345l);
public static byte[] get(int size) {
byte[] array = (byte[]) Array.newInstance(byte.class, size);
for (int i = 0; i < size; i++)
array[i] = CACHE[rnd.nextInt(CACHE.length - 1)];
return array;
}
}
@Test
public void testSnappyCompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
for(int i=0;i<10;i++) {
ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) {
@Override
public void doWork() throws Exception {
testSnappyCompressDecompress();
}
});
}
ctx.startThreads();
ctx.waitFor(60000);
}
}