blob: dcfb7e9e32df1cfb972541a57ea542e2e3bd7556 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.zstd;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DecompressorStream;
import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
public class TestZStandardCompressorDecompressor {
private final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
private static final Random RANDOM = new Random(12345L);
private static final Configuration CONFIGURATION = new Configuration();
private static File compressedFile;
private static File uncompressedFile;
@BeforeClass
public static void beforeClass() throws Exception {
CONFIGURATION.setInt(IO_FILE_BUFFER_SIZE_KEY, 1024 * 64);
uncompressedFile = new File(TestZStandardCompressorDecompressor.class
.getResource("/zstd/test_file.txt").toURI());
compressedFile = new File(TestZStandardCompressorDecompressor.class
.getResource("/zstd/test_file.txt.zst").toURI());
}
@Before
public void before() throws Exception {
assumeTrue(ZStandardCodec.isNativeCodeLoaded());
}
@Test
public void testCompressionCompressesCorrectly() throws Exception {
int uncompressedSize = (int) FileUtils.sizeOf(uncompressedFile);
byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
assertEquals(uncompressedSize, bytes.length);
Configuration conf = new Configuration();
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(conf);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor = codec.createCompressor();
CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor);
for (byte aByte : bytes) {
outputStream.write(aByte);
}
outputStream.finish();
outputStream.close();
assertEquals(uncompressedSize, compressor.getBytesRead());
assertTrue(compressor.finished());
// just make sure we can decompress the file
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream inputStream =
codec.createInputStream(bais, decompressor);
byte[] buffer = new byte[100];
int n = buffer.length;
while ((n = inputStream.read(buffer, 0, n)) != -1) {
byteArrayOutputStream.write(buffer, 0, n);
}
assertArrayEquals(bytes, byteArrayOutputStream.toByteArray());
}
@Test(expected = NullPointerException.class)
public void testCompressorSetInputNullPointerException() {
ZStandardCompressor compressor = new ZStandardCompressor();
compressor.setInput(null, 0, 10);
}
//test on NullPointerException in {@code decompressor.setInput()}
@Test(expected = NullPointerException.class)
public void testDecompressorSetInputNullPointerException() {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
decompressor.setInput(null, 0, 10);
}
//test on ArrayIndexOutOfBoundsException in {@code compressor.setInput()}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testCompressorSetInputAIOBException() {
ZStandardCompressor compressor = new ZStandardCompressor();
compressor.setInput(new byte[] {}, -5, 10);
}
//test on ArrayIndexOutOfBoundsException in {@code decompressor.setInput()}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testDecompressorSetInputAIOUBException() {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
decompressor.setInput(new byte[] {}, -5, 10);
}
//test on NullPointerException in {@code compressor.compress()}
@Test(expected = NullPointerException.class)
public void testCompressorCompressNullPointerException() throws Exception {
ZStandardCompressor compressor = new ZStandardCompressor();
byte[] bytes = generate(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(null, 0, 0);
}
//test on NullPointerException in {@code decompressor.decompress()}
@Test(expected = NullPointerException.class)
public void testDecompressorCompressNullPointerException() throws Exception {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
byte[] bytes = generate(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(null, 0, 0);
}
//test on ArrayIndexOutOfBoundsException in {@code compressor.compress()}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testCompressorCompressAIOBException() throws Exception {
ZStandardCompressor compressor = new ZStandardCompressor();
byte[] bytes = generate(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(new byte[] {}, 0, -1);
}
//test on ArrayIndexOutOfBoundsException in decompressor.decompress()
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testDecompressorCompressAIOBException() throws Exception {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
byte[] bytes = generate(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(new byte[] {}, 0, -1);
}
// test ZStandardCompressor compressor.compress()
@Test
public void testSetInputWithBytesSizeMoreThenDefaultZStandardBufferSize()
throws Exception {
int bytesSize = 1024 * 2056 + 1;
ZStandardCompressor compressor = new ZStandardCompressor();
byte[] bytes = generate(bytesSize);
assertTrue("needsInput error !!!", compressor.needsInput());
compressor.setInput(bytes, 0, bytes.length);
byte[] emptyBytes = new byte[bytesSize];
int cSize = compressor.compress(emptyBytes, 0, bytes.length);
assertTrue(cSize > 0);
}
// test compress/decompress process through
// CompressionOutputStream/CompressionInputStream api
@Test
public void testCompressorDecompressorLogicWithCompressionStreams()
throws Exception {
DataOutputStream deflateOut = null;
DataInputStream inflateIn = null;
int byteSize = 1024 * 100;
byte[] bytes = generate(byteSize);
int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
try {
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
CompressionOutputStream deflateFilter =
new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
bufferSize);
deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
deflateOut.write(bytes, 0, bytes.length);
deflateOut.flush();
deflateFilter.finish();
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
CompressionInputStream inflateFilter =
new DecompressorStream(deCompressedDataBuffer,
new ZStandardDecompressor(bufferSize), bufferSize);
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
byte[] result = new byte[byteSize];
inflateIn.read(result);
assertArrayEquals("original array not equals compress/decompressed array",
result, bytes);
} finally {
IOUtils.closeQuietly(deflateOut);
IOUtils.closeQuietly(inflateIn);
}
}
@Test
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext();
for (int i = 0; i < 10; i++) {
ctx.addThread(new MultithreadedTestUtil.TestingThread(ctx) {
@Override
public void doWork() throws Exception {
testCompressDecompress();
}
});
}
ctx.startThreads();
ctx.waitFor(60000);
}
@Test
public void testCompressDecompress() throws Exception {
byte[] rawData;
int rawDataSize;
rawDataSize = IO_FILE_BUFFER_SIZE_DEFAULT;
rawData = generate(rawDataSize);
ZStandardCompressor compressor = new ZStandardCompressor();
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
assertFalse(compressor.finished());
compressor.setInput(rawData, 0, rawData.length);
assertEquals(0, compressor.getBytesRead());
compressor.finish();
byte[] compressedResult = new byte[rawDataSize];
int cSize = compressor.compress(compressedResult, 0, rawDataSize);
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue(cSize < rawDataSize);
decompressor.setInput(compressedResult, 0, cSize);
byte[] decompressedBytes = new byte[rawDataSize];
decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
assertEquals(bytesToHex(rawData), bytesToHex(decompressedBytes));
compressor.reset();
decompressor.reset();
}
@Test
public void testCompressingWithOneByteOutputBuffer() throws Exception {
int uncompressedSize = (int) FileUtils.sizeOf(uncompressedFile);
byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
assertEquals(uncompressedSize, bytes.length);
Configuration conf = new Configuration();
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(conf);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor =
new ZStandardCompressor(3, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor);
for (byte aByte : bytes) {
outputStream.write(aByte);
}
outputStream.finish();
outputStream.close();
assertEquals(uncompressedSize, compressor.getBytesRead());
assertTrue(compressor.finished());
// just make sure we can decompress the file
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream inputStream =
codec.createInputStream(bais, decompressor);
byte[] buffer = new byte[100];
int n = buffer.length;
while ((n = inputStream.read(buffer, 0, n)) != -1) {
byteArrayOutputStream.write(buffer, 0, n);
}
assertArrayEquals(bytes, byteArrayOutputStream.toByteArray());
}
@Test
public void testZStandardCompressDecompress() throws Exception {
byte[] rawData = null;
int rawDataSize = 0;
rawDataSize = IO_FILE_BUFFER_SIZE_DEFAULT;
rawData = generate(rawDataSize);
ZStandardCompressor compressor = new ZStandardCompressor();
ZStandardDecompressor decompressor = new ZStandardDecompressor(rawDataSize);
assertTrue(compressor.needsInput());
assertFalse("testZStandardCompressDecompress finished error",
compressor.finished());
compressor.setInput(rawData, 0, rawData.length);
compressor.finish();
byte[] compressedResult = new byte[rawDataSize];
int cSize = compressor.compress(compressedResult, 0, rawDataSize);
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue("compressed size no less then original size",
cSize < rawDataSize);
decompressor.setInput(compressedResult, 0, cSize);
byte[] decompressedBytes = new byte[rawDataSize];
decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
String decompressed = bytesToHex(decompressedBytes);
String original = bytesToHex(rawData);
assertEquals(original, decompressed);
compressor.reset();
decompressor.reset();
}
@Test
public void testDecompressingOutput() throws Exception {
byte[] expectedDecompressedResult =
FileUtils.readFileToByteArray(uncompressedFile);
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(CONFIGURATION);
CompressionInputStream inputStream = codec
.createInputStream(FileUtils.openInputStream(compressedFile),
codec.createDecompressor());
byte[] toDecompress = new byte[100];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] decompressedResult;
int totalFileSize = 0;
int result = toDecompress.length;
try {
while ((result = inputStream.read(toDecompress, 0, result)) != -1) {
baos.write(toDecompress, 0, result);
totalFileSize += result;
}
decompressedResult = baos.toByteArray();
} finally {
IOUtils.closeQuietly(baos);
}
assertEquals(decompressedResult.length, totalFileSize);
assertEquals(bytesToHex(expectedDecompressedResult),
bytesToHex(decompressedResult));
}
@Test
public void testZStandardDirectCompressDecompress() throws Exception {
int[] size = {1, 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
for (int aSize : size) {
System.out.println("aSize = " + aSize);
compressDecompressLoop(aSize);
}
}
private void compressDecompressLoop(int rawDataSize) throws IOException {
byte[] rawData = null;
rawData = generate(rawDataSize);
ByteArrayOutputStream baos = new ByteArrayOutputStream(rawDataSize + 12);
CompressionOutputStream deflateFilter =
new CompressorStream(baos, new ZStandardCompressor(), 4096);
DataOutputStream deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
deflateOut.write(rawData, 0, rawData.length);
deflateOut.flush();
deflateFilter.finish();
byte[] compressedResult = baos.toByteArray();
int compressedSize = compressedResult.length;
ZStandardDecompressor.ZStandardDirectDecompressor decompressor =
new ZStandardDecompressor.ZStandardDirectDecompressor(4096);
ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
ByteBuffer outBuf = ByteBuffer.allocateDirect(8096);
inBuf.put(compressedResult, 0, compressedSize);
inBuf.flip();
ByteBuffer expected = ByteBuffer.wrap(rawData);
outBuf.clear();
while (!decompressor.finished()) {
decompressor.decompress(inBuf, outBuf);
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
assertEquals(0, expected.remaining());
}
@Test
public void testReadingWithAStream() throws Exception {
FileInputStream inputStream = FileUtils.openInputStream(compressedFile);
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(CONFIGURATION);
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream cis =
codec.createInputStream(inputStream, decompressor);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] resultOfDecompression;
try {
byte[] buffer = new byte[100];
int n;
while ((n = cis.read(buffer, 0, buffer.length)) != -1) {
baos.write(buffer, 0, n);
}
resultOfDecompression = baos.toByteArray();
} finally {
IOUtils.closeQuietly(baos);
IOUtils.closeQuietly(cis);
}
byte[] expected = FileUtils.readFileToByteArray(uncompressedFile);
assertEquals(bytesToHex(expected), bytesToHex(resultOfDecompression));
}
@Test
public void testDecompressReturnsWhenNothingToDecompress() throws Exception {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
int result = decompressor.decompress(new byte[10], 0, 10);
assertEquals(0, result);
}
public static byte[] generate(int size) {
byte[] data = new byte[size];
for (int i = 0; i < size; i++) {
data[i] = (byte) RANDOM.nextInt(16);
}
return data;
}
private static String bytesToHex(byte[] bytes) {
char[] hexChars = new char[bytes.length * 2];
for (int j = 0; j < bytes.length; j++) {
int v = bytes[j] & 0xFF;
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars);
}
}