blob: fd0effa143b9525664b5ab7c84f93239e62f88a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.hadoop.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.*;
import java.util.Arrays;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RandomDatum;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ensure that we can perform codec operations given the API and runtime jars
* by performing some simple smoke tests.
*/
public class ITUseHadoopCodecs {
private static final Logger LOG = LoggerFactory.getLogger(ITUseHadoopCodecs.class);
private Configuration haddopConf = new Configuration();
private int dataCount = 100;
private int dataSeed = new Random().nextInt();
@Test
public void testGzipCodec() throws IOException {
ZlibFactory.setNativeZlibLoaded(false);
assertFalse(ZlibFactory.isNativeZlibLoaded(haddopConf));
codecTest(haddopConf, dataSeed, 0, "org.apache.hadoop.io.compress.GzipCodec");
codecTest(haddopConf, dataSeed, dataCount, "org.apache.hadoop.io.compress.GzipCodec");
}
@Test
public void testSnappyCodec() throws IOException {
codecTest(haddopConf, dataSeed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
codecTest(haddopConf, dataSeed, dataCount, "org.apache.hadoop.io.compress.SnappyCodec");
}
@Test
public void testLz4Codec() {
Arrays.asList(false, true).forEach(config -> {
haddopConf.setBoolean(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
config);
try {
codecTest(haddopConf, dataSeed, 0, "org.apache.hadoop.io.compress.Lz4Codec");
codecTest(haddopConf, dataSeed, dataCount, "org.apache.hadoop.io.compress.Lz4Codec");
} catch (IOException e) {
throw new RuntimeException("failed when running codecTest", e);
}
});
}
private void codecTest(Configuration conf, int seed, int count, String codecClass)
throws IOException {
// Create the codec
CompressionCodec codec = null;
try {
codec = (CompressionCodec)
ReflectionUtils.newInstance(conf.getClassByName(codecClass), conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Illegal codec!");
}
LOG.info("Created a Codec object of type: " + codecClass);
// Generate data
DataOutputBuffer data = new DataOutputBuffer();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
for(int i = 0; i < count; ++i) {
generator.next();
RandomDatum key = generator.getKey();
RandomDatum value = generator.getValue();
key.write(data);
value.write(data);
}
LOG.info("Generated " + count + " records");
// Compress data
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
try (CompressionOutputStream deflateFilter =
codec.createOutputStream(compressedDataBuffer);
DataOutputStream deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter))) {
deflateOut.write(data.getData(), 0, data.getLength());
deflateOut.flush();
deflateFilter.finish();
}
// De-compress data
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
DataInputBuffer originalData = new DataInputBuffer();
originalData.reset(data.getData(), 0, data.getLength());
try (CompressionInputStream inflateFilter =
codec.createInputStream(deCompressedDataBuffer);
DataInputStream originalIn =
new DataInputStream(new BufferedInputStream(originalData))) {
// Check
int expected;
do {
expected = originalIn.read();
assertEquals("Inflated stream read by byte does not match",
expected, inflateFilter.read());
} while (expected != -1);
}
LOG.info("SUCCESS! Completed checking " + count + " records");
}
}