blob: bf35955625fa26c8c240154d56be2e029a782647 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.sort.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
import org.apache.tez.runtime.library.utils.BufferUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
public class TestIFile {
private static final Logger LOG = LoggerFactory.getLogger(TestIFile.class);
private static Configuration defaultConf = new Configuration();
private static FileSystem localFs = null;
private static Path workDir = null;
private static CompressionCodec codec;
private Random rnd = new Random();
private String outputFileName = "ifile.out";
private Path outputPath;
private DataOutputBuffer k = new DataOutputBuffer();
private DataOutputBuffer v = new DataOutputBuffer();
static {
defaultConf.set("fs.defaultFS", "file:///");
try {
localFs = FileSystem.getLocal(defaultConf);
workDir = new Path(
new Path(System.getProperty("test.build.data", "/tmp")), TestIFile.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
LOG.info("Using workDir: " + workDir);
defaultConf.set(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workDir.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Before
public void setUp() throws Exception {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
Configuration());
codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
outputPath = new Path(workDir, outputFileName);
}
@Before
@After
public void cleanup() throws Exception {
localFs.delete(workDir, true);
}
@Test(timeout = 5000)
//empty IFile
public void testWithEmptyIFile() throws IOException {
testWriterAndReader(new LinkedList<KVPair>());
testWithDataBuffer(new LinkedList<KVPair>());
}
@Test(timeout = 5000)
public void testCompressedFlag() throws IOException {
byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F' , (byte) 1};
ByteArrayInputStream bin = new ByteArrayInputStream(HEADER);
boolean compressed = IFile.Reader.isCompressedFlagEnabled(bin);
assert(compressed == true);
//Negative case: Half cooked header
HEADER = new byte[] { (byte) 'T', (byte) 'I' };
bin = new ByteArrayInputStream(HEADER);
try {
compressed = IFile.Reader.isCompressedFlagEnabled(bin);
fail("Should not have allowed wrong header");
} catch(Exception e) {
//correct path.
}
}
@Test(timeout = 5000)
//Write empty key value pairs
public void testWritingEmptyKeyValues() throws IOException {
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer value = new DataInputBuffer();
IFile.Writer writer = new IFile.Writer(null, null, localFs, outputPath, null, null, null,
null, null);
writer.append(key, value);
writer.append(key, value);
writer.append(key, value);
writer.append(key, value);
writer.close();
IFile.Reader reader = new Reader(localFs, outputPath, null, null, null, false, -1, 1024);
DataInputBuffer keyIn = new DataInputBuffer();
DataInputBuffer valIn = new DataInputBuffer();
int records = 0;
while (reader.nextRawKey(keyIn)) {
reader.nextRawValue(valIn);
records++;
assert(keyIn.getLength() == 0);
assert(valIn.getLength() == 0);
}
assertTrue("Number of records read does not match", (records == 4));
reader.close();
}
@Test(timeout = 5000)
//test with unsorted data and repeat keys
public void testWithUnsortedData() throws IOException {
List<KVPair> unsortedData = KVDataGen.generateTestData(false, rnd.nextInt(100));
testWriterAndReader(unsortedData);
testWithDataBuffer(unsortedData);
}
@Test(timeout = 5000)
//test with sorted data and repeat keys
public void testWithSortedData() throws IOException {
List<KVPair> sortedData = KVDataGen.generateTestData(true, rnd.nextInt(100));
testWriterAndReader(sortedData);
testWithDataBuffer(sortedData);
}
@Test(timeout = 5000)
//test overflow
public void testExceedMaxSize() throws IOException {
final int oldMaxBufferSize = IFile.Reader.MAX_BUFFER_SIZE;
Text shortString = new Text("string");
Text longString = new Text("A string of length 22.");
assertEquals(22, longString.getLength());
Text readKey = new Text();
Text readValue = new Text();
DataInputBuffer keyIn = new DataInputBuffer();
DataInputBuffer valIn = new DataInputBuffer();
IFile.Writer writer;
IFile.Reader reader;
FSDataOutputStream out;
// Check Key length exceeding MAX_BUFFER_SIZE
out = localFs.create(outputPath);
writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(longString, shortString);
writer.close();
out.close();
// Set this to a smaller value for testing
IFile.Reader.MAX_BUFFER_SIZE = 16;
reader = new IFile.Reader(localFs, outputPath,
null, null, null, false, 0, -1);
try {
reader.nextRawKey(keyIn);
Assert.fail("Expected IllegalArgumentException to be thrown");
} catch (IllegalArgumentException e) {
// test passed
}
reader.close();
// Check Value length exceeding MAX_BUFFER_SIZE
out = localFs.create(outputPath);
writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(shortString, longString);
writer.close();
out.close();
// Set this to a smaller value for testing
IFile.Reader.MAX_BUFFER_SIZE = 16;
reader = new IFile.Reader(localFs, outputPath,
null, null, null, false, 0, -1);
try {
reader.nextRawKey(keyIn);
reader.nextRawValue(valIn);
Assert.fail("Expected IllegalArgumentException to be thrown");
} catch (IllegalArgumentException e) {
// test passed
}
reader.close();
// Check Key length not getting doubled
out = localFs.create(outputPath);
writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(longString, shortString);
writer.close();
out.close();
// Set this to a smaller value for testing
IFile.Reader.MAX_BUFFER_SIZE = 32;
reader = new IFile.Reader(localFs, outputPath,
null, null, null, false, 0, -1);
reader.nextRawKey(keyIn);
assertEquals(longString.getLength() + 1, keyIn.getData().length);
reader.close();
// Check Value length not getting doubled
out = localFs.create(outputPath);
writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, Text.class, null, null, null, false);
writer.append(shortString, longString);
writer.close();
out.close();
// Set this to a smaller value for testing
IFile.Reader.MAX_BUFFER_SIZE = 32;
reader = new IFile.Reader(localFs, outputPath,
null, null, null, false, 0, -1);
reader.nextRawKey(keyIn);
reader.nextRawValue(valIn);
assertEquals(longString.getLength() + 1, valIn.getData().length);
reader.close();
// revert back to original value
IFile.Reader.MAX_BUFFER_SIZE = oldMaxBufferSize;
}
@Test(timeout = 5000)
//test with sorted data and repeat keys
public void testWithRLEMarker() throws IOException {
//Test with append(Object, Object)
FSDataOutputStream out = localFs.create(outputPath);
IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, true);
Text key = new Text("key0");
IntWritable value = new IntWritable(0);
writer.append(key, value);
//same key (RLE should kick in)
key = new Text("key0");
writer.append(key, value);
assertTrue(writer.sameKey);
//Different key
key = new Text("key1");
writer.append(key, value);
assertFalse(writer.sameKey);
writer.close();
out.close();
//Test with append(DataInputBuffer key, DataInputBuffer value)
byte[] kvbuffer = "key1Value1key1Value2key3Value3".getBytes();
int keyLength = 4;
int valueLength = 6;
int pos = 0;
out = localFs.create(outputPath);
writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, true);
BoundedByteArrayOutputStream boundedOut = new BoundedByteArrayOutputStream(1024*1024);
Writer inMemWriter = new InMemoryWriter(boundedOut, true);
DataInputBuffer kin = new DataInputBuffer();
kin.reset(kvbuffer, pos, keyLength);
DataInputBuffer vin = new DataInputBuffer();
DataOutputBuffer vout = new DataOutputBuffer();
(new IntWritable(0)).write(vout);
vin.reset(vout.getData(), vout.getLength());
//Write initial KV pair
writer.append(kin, vin);
assertFalse(writer.sameKey);
inMemWriter.append(kin, vin);
assertFalse(inMemWriter.sameKey);
pos += (keyLength + valueLength);
//Second key is similar to key1 (RLE should kick in)
kin.reset(kvbuffer, pos, keyLength);
(new IntWritable(0)).write(vout);
vin.reset(vout.getData(), vout.getLength());
writer.append(kin, vin);
assertTrue(writer.sameKey);
inMemWriter.append(kin, vin);
assertTrue(inMemWriter.sameKey);
pos += (keyLength + valueLength);
//Next key (key3) is different (RLE should not kick in)
kin.reset(kvbuffer, pos, keyLength);
(new IntWritable(0)).write(vout);
vin.reset(vout.getData(), vout.getLength());
writer.append(kin, vin);
assertFalse(writer.sameKey);
inMemWriter.append(kin, vin);
assertFalse(inMemWriter.sameKey);
writer.close();
out.close();
inMemWriter.close();
boundedOut.close();
}
@Test(timeout = 5000)
//test with unique keys
public void testWithUniqueKeys() throws IOException {
//all keys are unique
List<KVPair> sortedData = KVDataGen.generateTestData(true, 0);
testWriterAndReader(sortedData);
testWithDataBuffer(sortedData);
}
//test concatenated zlib input - as in multiple map outputs during shuffle
//This specific input is valid but the decompressor can leave lingering
// bytes between segments. If the lingering bytes aren't handled correctly,
// the stream will get out-of-sync.
@Test(timeout = 5000)
public void testConcatenatedZlibPadding()
throws IOException, URISyntaxException {
byte[] bytes;
long compTotal = 0;
// Known raw and compressed lengths of input
long raws[] = { 2392, 102314, 42576, 31432, 25090 };
long compressed[] = { 723, 25396, 10926, 8203, 6665 };
CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
Configuration());
codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
URL url = getClass().getClassLoader()
.getResource("TestIFile_concatenated_compressed.bin");
assertNotEquals("IFileinput file must exist", null, url);
Path p = new Path(url.toURI());
FSDataInputStream inStream = localFs.open(p);
for (int i = 0; i < 5; i++) {
bytes = new byte[(int) raws[i]];
assertEquals("Compressed stream out-of-sync", inStream.getPos(), compTotal);
IFile.Reader.readToMemory(bytes, inStream, (int) compressed[i], codec,
false, -1);
compTotal += compressed[i];
// Now read the data
InMemoryReader inMemReader = new InMemoryReader(null,
new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
DataInputBuffer keyIn = new DataInputBuffer();
DataInputBuffer valIn = new DataInputBuffer();
Deserializer<Text> keyDeserializer;
Deserializer<IntWritable> valDeserializer;
SerializationFactory serializationFactory =
new SerializationFactory(defaultConf);
keyDeserializer = serializationFactory.getDeserializer(Text.class);
valDeserializer = serializationFactory.getDeserializer(IntWritable.class);
keyDeserializer.open(keyIn);
valDeserializer.open(valIn);
while (inMemReader.nextRawKey(keyIn)) {
inMemReader.nextRawValue(valIn);
}
}
inStream.close();
}
@Test(timeout = 5000)
//Test InMemoryWriter
public void testInMemoryWriter() throws IOException {
InMemoryWriter writer = null;
BoundedByteArrayOutputStream bout = new BoundedByteArrayOutputStream(1024 * 1024);
List<KVPair> data = KVDataGen.generateTestData(true, 10);
//No RLE, No RepeatKeys, no compression
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, false, data);
readUsingInMemoryReader(bout.getBuffer(), data);
//No RLE, RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, true, data);
readUsingInMemoryReader(bout.getBuffer(), data);
//RLE, No RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout, true);
writeTestFileUsingDataBuffer(writer, false, data);
readUsingInMemoryReader(bout.getBuffer(), data);
//RLE, RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout, true);
writeTestFileUsingDataBuffer(writer, true, data);
readUsingInMemoryReader(bout.getBuffer(), data);
}
@Test(timeout = 5000)
//Test appendValue feature
public void testAppendValue() throws IOException {
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
Text previousKey = null;
for (KVPair kvp : data) {
if ((previousKey != null && previousKey.compareTo(kvp.getKey()) == 0)) {
writer.appendValue(kvp.getvalue());
} else {
writer.append(kvp.getKey(), kvp.getvalue());
}
previousKey = kvp.getKey();
}
writer.close();
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
@Test(timeout = 5000)
//Test appendValues feature
public void testAppendValues() throws IOException {
List<KVPair> data = new ArrayList<KVPair>();
List<IntWritable> values = new ArrayList<IntWritable>();
Text key = new Text("key");
IntWritable val = new IntWritable(1);
for(int i = 0; i < 5; i++) {
data.add(new KVPair(key, val));
values.add(val);
}
IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
writer.append(data.get(0).getKey(), data.get(0).getvalue()); //write first KV pair
writer.appendValues(values.subList(1, values.size()).iterator()); //add the rest here
Text lastKey = new Text("key3");
IntWritable lastVal = new IntWritable(10);
data.add(new KVPair(lastKey, lastVal));
writer.append(lastKey, lastVal);
writer.close();
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
@Test(timeout = 5000)
// Basic test
public void testFileBackedInMemIFileWriter() throws IOException {
List<KVPair> data = new ArrayList<>();
List<IntWritable> values = new ArrayList<>();
Text key = new Text("key");
IntWritable val = new IntWritable(1);
for(int i = 0; i < 5; i++) {
data.add(new KVPair(key, val));
values.add(val);
}
TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
Text.class, IntWritable.class, codec, null, null,
200);
writer.appendKeyValues(data.get(0).getKey(), values.iterator());
Text lastKey = new Text("key3");
IntWritable lastVal = new IntWritable(10);
data.add(new KVPair(lastKey, lastVal));
writer.append(lastKey, lastVal);
writer.close();
byte[] bytes = new byte[(int) writer.getRawLength()];
IFile.Reader.readToMemory(bytes,
new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
(int) writer.getCompressedLength(), codec, false, -1);
readUsingInMemoryReader(bytes, data);
}
@Test(timeout = 5000)
// Basic test
public void testFileBackedInMemIFileWriterWithSmallBuffer() throws IOException {
List<KVPair> data = new ArrayList<>();
TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
Text.class, IntWritable.class, codec, null, null,
2);
// empty ifile
writer.close();
// Buffer should have self adjusted. So for this empty file, it shouldn't
// hit disk.
assertFalse("Data should have been flushed to disk", writer.isDataFlushedToDisk());
byte[] bytes = new byte[(int) writer.getRawLength()];
IFile.Reader.readToMemory(bytes,
new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
(int) writer.getCompressedLength(), codec, false, -1);
readUsingInMemoryReader(bytes, data);
}
@Test(timeout = 20000)
// Test file spill over scenario
public void testFileBackedInMemIFileWriter_withSpill() throws IOException {
List<KVPair> data = new ArrayList<>();
List<IntWritable> values = new ArrayList<>();
Text key = new Text("key");
IntWritable val = new IntWritable(1);
for(int i = 0; i < 5; i++) {
data.add(new KVPair(key, val));
values.add(val);
}
// Setting cache limit to 20. Actual data would be around 43 bytes, so it would spill over.
TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
Text.class, IntWritable.class, codec, null, null,
20);
writer.setOutputPath(outputPath);
writer.appendKeyValues(data.get(0).getKey(), values.iterator());
Text lastKey = new Text("key3");
IntWritable lastVal = new IntWritable(10);
data.add(new KVPair(lastKey, lastVal));
writer.append(lastKey, lastVal);
writer.close();
assertTrue("Data should have been flushed to disk", writer.isDataFlushedToDisk());
// Read output content to memory
FSDataInputStream inStream = localFs.open(outputPath);
byte[] bytes = new byte[(int) writer.getRawLength()];
IFile.Reader.readToMemory(bytes, inStream,
(int) writer.getCompressedLength(), codec, false, -1);
inStream.close();
readUsingInMemoryReader(bytes, data);
}
@Test(timeout = 5000)
// Test empty file case
public void testEmptyFileBackedInMemIFileWriter() throws IOException {
List<KVPair> data = new ArrayList<>();
TezTaskOutputFiles
tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
Text.class, IntWritable.class, codec, null, null,
100);
// empty ifile
writer.close();
byte[] bytes = new byte[(int) writer.getRawLength()];
IFile.Reader.readToMemory(bytes,
new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
(int) writer.getCompressedLength(), codec, false, -1);
readUsingInMemoryReader(bytes, data);
}
@Test(timeout = 5000)
//Test appendKeyValues feature
public void testAppendKeyValues() throws IOException {
List<KVPair> data = new ArrayList<KVPair>();
List<IntWritable> values = new ArrayList<IntWritable>();
Text key = new Text("key");
IntWritable val = new IntWritable(1);
for(int i = 0; i < 5; i++) {
data.add(new KVPair(key, val));
values.add(val);
}
IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
writer.appendKeyValues(data.get(0).getKey(), values.iterator());
Text lastKey = new Text("key3");
IntWritable lastVal = new IntWritable(10);
data.add(new KVPair(lastKey, lastVal));
writer.append(lastKey, lastVal);
writer.close();
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
@Test(timeout = 5000)
//Test appendValue with DataInputBuffer
public void testAppendValueWithDataInputBuffer() throws IOException {
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
final DataInputBuffer previousKey = new DataInputBuffer();
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer value = new DataInputBuffer();
for (KVPair kvp : data) {
populateData(kvp, key, value);
if ((previousKey != null && BufferUtils.compare(key, previousKey) == 0)) {
writer.appendValue(value);
} else {
writer.append(key, value);
}
previousKey.reset(k.getData(), 0, k.getLength());
}
writer.close();
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
@Test(timeout = 20000)
public void testReadToDisk() throws IOException {
// verify sending a stream of zeroes generates an error
byte[] zeroData = new byte[1000];
Arrays.fill(zeroData, (byte) 0);
ByteArrayInputStream in = new ByteArrayInputStream(zeroData);
try {
IFile.Reader.readToDisk(new ByteArrayOutputStream(), in, zeroData.length, false, 0);
fail("Exception should have been thrown");
} catch (IOException e) {
}
// verify sending same stream of zeroes with a valid IFile header still
// generates an error
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(IFile.HEADER);
baos.write(zeroData);
try {
IFile.Reader.readToDisk(new ByteArrayOutputStream(),
new ByteArrayInputStream(baos.toByteArray()), zeroData.length, false, 0);
fail("Exception should have been thrown");
} catch (IOException e) {
assertTrue(e instanceof ChecksumException);
}
// verify valid data is copied properly
List<KVPair> data = KVDataGen.generateTestData(true, 0);
Writer writer = writeTestFile(false, false, data, codec);
baos.reset();
IFile.Reader.readToDisk(baos, localFs.open(outputPath), writer.getCompressedLength(),
false, 0);
byte[] diskData = baos.toByteArray();
Reader reader = new Reader(new ByteArrayInputStream(diskData), diskData.length,
codec, null, null, false, 0, 1024);
verifyData(reader, data);
reader.close();
}
@Test
public void testInMemoryBufferSize() throws IOException {
Configurable configurableCodec = (Configurable) codec;
int originalCodecBufferSize =
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
// for smaller amount of data, codec buffer should be sized according to compressed data length
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
Writer writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
// buffer size cannot grow infinitely with compressed data size
data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
}
@Test(expected = IllegalArgumentException.class)
public void testSmallDataCompression() throws IOException {
Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
tryWriteFileWithBufferSize(17, "org.apache.hadoop.io.compress.Lz4Codec");
tryWriteFileWithBufferSize(32, "org.apache.hadoop.io.compress.Lz4Codec");
}
private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName)
throws IOException {
Configuration conf = new Configuration();
System.out.println("trying with buffer size: " + bufferSize);
conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codecToTest =
codecFactory.getCodecByClassName(codecClassName);
List<KVPair> data = KVDataGen.generateTestDataOfKeySize(false, 1, 0);
writeTestFile(false, false, data, codecToTest);
}
@Test(expected = IllegalArgumentException.class)
public void testLz4CompressedDataIsLargerThanOriginal() throws IOException {
Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
// this one succeeds
byte[] buf = new byte[32];
initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48);
Lz4Compressor comp = new Lz4Compressor(32, false);
comp.setInput(buf, 0, 32);
comp.compress(buf, 0, 32);
// adding 1 more element makes that fail
buf = new byte[32];
initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48,
50);
comp = new Lz4Compressor(32, false);
comp.setInput(buf, 0, 32);
comp.compress(buf, 0, 32);
}
private void initBufWithNumbers(byte[] buf, int... args) {
for (int i = 0; i < args.length; i++) {
buf[i] = (byte) args[i];
}
}
/**
* Test different options (RLE, repeat keys, compression) on reader/writer
*
* @param data
* @throws IOException
*/
private void testWriterAndReader(List<KVPair> data) throws IOException {
Writer writer = null;
//No RLE, No RepeatKeys
writer = writeTestFile(false, false, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
//No RLE, RepeatKeys
writer = writeTestFile(false, true, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFile(false, true, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
//RLE, No RepeatKeys
writer = writeTestFile(true, false, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFile(true, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
//RLE, RepeatKeys
writer = writeTestFile(true, true, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFile(true, true, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
/**
* Test different options (RLE, repeat keys, compression) on reader/writer
*
* @param data
* @throws IOException
*/
private void testWithDataBuffer(List<KVPair> data) throws
IOException {
Writer writer = null;
//No RLE, No RepeatKeys
writer = writeTestFileUsingDataBuffer(false, false, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFileUsingDataBuffer(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
//No RLE, RepeatKeys
writer = writeTestFileUsingDataBuffer(false, true, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFileUsingDataBuffer(false, true, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
//RLE, No RepeatKeys
writer = writeTestFileUsingDataBuffer(true, false, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFileUsingDataBuffer(true, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
//RLE, RepeatKeys
writer = writeTestFileUsingDataBuffer(true, true, data, null);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
writer = writeTestFileUsingDataBuffer(true, true, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
private void readAndVerifyData(long rawLength, long compressedLength,
List<KVPair> originalData, CompressionCodec codec) throws
IOException {
readFileUsingInMemoryReader(rawLength, compressedLength, originalData);
readUsingIFileReader(originalData, codec);
}
/**
* Read data using in memory reader
*
* @param rawLength
* @param compressedLength
* @param originalData
* @throws IOException
*/
private void readFileUsingInMemoryReader(long rawLength, long compressedLength,
List<KVPair> originalData) throws IOException {
LOG.info("Read using in memory reader");
FSDataInputStream inStream = localFs.open(outputPath);
byte[] bytes = new byte[(int) rawLength];
IFile.Reader.readToMemory(bytes, inStream,
(int) compressedLength, codec, false, -1);
inStream.close();
readUsingInMemoryReader(bytes, originalData);
}
private void readUsingInMemoryReader(byte[] bytes, List<KVPair> originalData)
throws IOException {
InMemoryReader inMemReader = new InMemoryReader(null,
new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
verifyData(inMemReader, originalData);
}
/**
* Read data using IFile Reader
*
* @param originalData
* @param codec
* @throws IOException
*/
private void readUsingIFileReader(List<KVPair> originalData,
CompressionCodec codec) throws IOException {
LOG.info("Read using IFile reader");
IFile.Reader reader = new IFile.Reader(localFs, outputPath,
codec, null, null, false, 0, -1);
verifyData(reader, originalData);
reader.close();
}
/**
* Data verification
*
* @param reader
* @param data
* @throws IOException
*/
private void verifyData(Reader reader, List<KVPair> data)
throws IOException {
LOG.info("Data verification");
Text readKey = new Text();
IntWritable readValue = new IntWritable();
DataInputBuffer keyIn = new DataInputBuffer();
DataInputBuffer valIn = new DataInputBuffer();
Deserializer<Text> keyDeserializer;
Deserializer<IntWritable> valDeserializer;
SerializationFactory serializationFactory = new SerializationFactory(
defaultConf);
keyDeserializer = serializationFactory.getDeserializer(Text.class);
valDeserializer = serializationFactory.getDeserializer(IntWritable.class);
keyDeserializer.open(keyIn);
valDeserializer.open(valIn);
int numRecordsRead = 0;
while (reader.nextRawKey(keyIn)) {
reader.nextRawValue(valIn);
readKey = keyDeserializer.deserialize(readKey);
readValue = valDeserializer.deserialize(readValue);
KVPair expected = data.get(numRecordsRead);
assertEquals("Key does not match: Expected: " + expected.getKey()
+ ", Read: " + readKey, expected.getKey(), readKey);
assertEquals("Value does not match: Expected: " + expected.getvalue()
+ ", Read: " + readValue, expected.getvalue(), readValue);
numRecordsRead++;
}
assertEquals("Expected: " + data.size() + " records, but found: "
+ numRecordsRead, data.size(), numRecordsRead);
LOG.info("Found: " + numRecordsRead + " records");
}
private Writer writeTestFile(boolean rle, boolean repeatKeys,
List<KVPair> data, CompressionCodec codec) throws IOException {
FSDataOutputStream out = localFs.create(outputPath);
IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, rle);
writeTestFile(writer, repeatKeys, data);
out.close();
return writer;
}
private Writer writeTestFile(IFile.Writer writer, boolean repeatKeys,
List<KVPair> data) throws IOException {
assertNotNull(writer);
Text previousKey = null;
for (KVPair kvp : data) {
if (repeatKeys && (previousKey != null && previousKey.compareTo(kvp.getKey()) == 0)) {
//RLE is enabled in IFile when IFile.REPEAT_KEY is set
writer.append(IFile.REPEAT_KEY, kvp.getvalue());
} else {
writer.append(kvp.getKey(), kvp.getvalue());
}
previousKey = kvp.getKey();
}
writer.close();
LOG.info("Uncompressed: " + writer.getRawLength());
LOG.info("CompressedSize: " + writer.getCompressedLength());
return writer;
}
private Writer writeTestFileUsingDataBuffer(boolean rle, boolean repeatKeys,
List<KVPair> data, CompressionCodec codec) throws IOException {
FSDataOutputStream out = localFs.create(outputPath);
IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
Text.class, IntWritable.class, codec, null, null, rle);
writeTestFileUsingDataBuffer(writer, repeatKeys, data);
out.close();
return writer;
}
private Writer writeTestFileUsingDataBuffer(Writer writer, boolean repeatKeys,
List<KVPair> data) throws IOException {
DataInputBuffer previousKey = new DataInputBuffer();
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer value = new DataInputBuffer();
for (KVPair kvp : data) {
populateData(kvp, key, value);
if (repeatKeys && (previousKey != null && BufferUtils.compare(key, previousKey) == 0)) {
writer.append(IFile.REPEAT_KEY, value);
} else {
writer.append(key, value);
}
previousKey.reset(key.getData(), 0, key.getLength());
}
writer.close();
LOG.info("Uncompressed: " + writer.getRawLength());
LOG.info("CompressedSize: " + writer.getCompressedLength());
return writer;
}
private void populateData(KVPair kvp, DataInputBuffer key, DataInputBuffer value)
throws IOException {
DataOutputBuffer k = new DataOutputBuffer();
DataOutputBuffer v = new DataOutputBuffer();
kvp.getKey().write(k);
kvp.getvalue().write(v);
key.reset(k.getData(), 0, k.getLength());
value.reset(v.getData(), 0, v.getLength());
}
}