blob: 39b02136e067afa99bdf131bdaa4b6ead699cd74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.impl.writer.StreamOptions;
import org.apache.orc.tools.FileDump;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class TestRLEv2 {
Path workDir = new Path(System.getProperty("test.tmp.dir",
"target" + File.separator + "test" + File.separator + "tmp"));
Path testFilePath;
Configuration conf;
FileSystem fs;
@Rule
public TestName testCaseName = new TestName();
@Before
public void openFileSystem () throws Exception {
conf = new Configuration();
fs = FileSystem.getLocal(conf);
testFilePath = new Path(workDir, "TestRLEv2." +
testCaseName.getMethodName() + ".orc");
fs.delete(testFilePath, false);
}
private void appendInt(VectorizedRowBatch batch, long i) {
((LongColumnVector) batch.cols[0]).vector[batch.size++] = i;
}
@Test
public void testFixedDeltaZero() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch(5120);
for (int i = 0; i < 5120; ++i) {
appendInt(batch, 123);
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// 10 runs of 512 elements. Each run has 2 bytes header, 2 bytes base (base = 123,
// zigzag encoded varint) and 1 byte delta (delta = 0). In total, 5 bytes per run.
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 50"));
System.setOut(origOut);
}
@Test
public void testFixedDeltaOne() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch(5120);
for (int i = 0; i < 5120; ++i) {
appendInt(batch, i % 512);
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// 10 runs of 512 elements. Each run has 2 bytes header, 1 byte base (base = 0)
// and 1 byte delta (delta = 1). In total, 4 bytes per run.
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 40"));
System.setOut(origOut);
}
@Test
public void testFixedDeltaOneDescending() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch(5120);
for (int i = 0; i < 5120; ++i) {
appendInt(batch, 512 - (i % 512));
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// 10 runs of 512 elements. Each run has 2 bytes header, 2 byte base (base = 512, zigzag + varint)
// and 1 byte delta (delta = 1). In total, 5 bytes per run.
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 50"));
System.setOut(origOut);
}
@Test
public void testFixedDeltaLarge() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch(5120);
for (int i = 0; i < 5120; ++i) {
appendInt(batch, i % 512 + ((i % 512) * 100));
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// 10 runs of 512 elements. Each run has 2 bytes header, 1 byte base (base = 0)
// and 2 bytes delta (delta = 100, zigzag encoded varint). In total, 5 bytes per run.
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 50"));
System.setOut(origOut);
}
@Test
public void testFixedDeltaLargeDescending() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch(5120);
for (int i = 0; i < 5120; ++i) {
appendInt(batch, (512 - i % 512) + ((i % 512) * 100));
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// 10 runs of 512 elements. Each run has 2 bytes header, 2 byte base (base = 512, zigzag + varint)
// and 2 bytes delta (delta = 100, zigzag encoded varint). In total, 6 bytes per run.
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 60"));
System.setOut(origOut);
}
@Test
public void testShortRepeat() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch(5120);
for (int i = 0; i < 5; ++i) {
appendInt(batch, 10);
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// 1 byte header + 1 byte value
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 2"));
System.setOut(origOut);
}
@Test
public void testDeltaUnknownSign() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch(5120);
appendInt(batch, 0);
for (int i = 0; i < 511; ++i) {
appendInt(batch, i);
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// monotonicity will be undetermined for this sequence 0,0,1,2,3,...510. Hence DIRECT encoding
// will be used. 2 bytes for header and 640 bytes for data (512 values with fixed bit of 10 bits
// each, 5120/8 = 640). Total bytes 642
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 642"));
System.setOut(origOut);
}
@Test
public void testPatchedBase() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
Random rand = new Random(123);
VectorizedRowBatch batch = schema.createRowBatch(5120);
appendInt(batch, 10000000);
for (int i = 0; i < 511; ++i) {
appendInt(batch, rand.nextInt(i+1));
}
w.addRowBatch(batch);
w.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8.toString()));
FileDump.main(new String[]{testFilePath.toUri().toString()});
System.out.flush();
String outDump = new String(myOut.toByteArray(), StandardCharsets.UTF_8);
// use PATCHED_BASE encoding
assertEquals(true, outDump.contains("Stream: column 0 section DATA start: 3 length 583"));
System.setOut(origOut);
}
@Test
public void testBaseValueLimit() throws Exception {
TypeDescription schema = TypeDescription.createInt();
Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.compress(CompressionKind.NONE)
.setSchema(schema)
.rowIndexStride(0)
.encodingStrategy(OrcFile.EncodingStrategy.COMPRESSION)
.version(OrcFile.Version.V_0_12)
);
VectorizedRowBatch batch = schema.createRowBatch();
//the minimum value is beyond RunLengthIntegerWriterV2.BASE_VALUE_LIMIT
long[] input = {-9007199254740992l,-8725724278030337l,-1125762467889153l, -1l,-9007199254740992l,
-9007199254740992l, -497l,127l,-1l,-72057594037927936l,-4194304l,-9007199254740992l,-4503599593816065l,
-4194304l,-8936830510563329l,-9007199254740992l, -1l, -70334384439312l,-4063233l, -6755399441973249l};
for(long data: input) {
appendInt(batch, data);
}
w.addRowBatch(batch);
w.close();
try(Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs))) {
RecordReader rows = reader.rows();
batch = reader.getSchema().createRowBatch();
long[] output = null;
while (rows.nextBatch(batch)) {
output = new long[batch.size];
System.arraycopy(((LongColumnVector) batch.cols[0]).vector, 0, output, 0, batch.size);
}
assertArrayEquals(input, output);
}
}
static class TestOutputCatcher implements PhysicalWriter.OutputReceiver {
int currentBuffer = 0;
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
@Override
public void output(ByteBuffer buffer) throws IOException {
buffers.add(buffer);
}
@Override
public void suppress() {
}
ByteBuffer getCurrentBuffer() {
while (currentBuffer < buffers.size() &&
buffers.get(currentBuffer).remaining() == 0) {
currentBuffer += 1;
}
return currentBuffer < buffers.size() ? buffers.get(currentBuffer) : null;
}
// assert that the list of ints (as bytes) are equal to the output
public void compareBytes(int... expected) {
for(int i=0; i < expected.length; ++i) {
ByteBuffer current = getCurrentBuffer();
assertEquals("position " + i, (byte) expected[i], current.get());
}
assertEquals(null, getCurrentBuffer());
}
}
static TestOutputCatcher encodeV2(long[] input,
boolean signed) throws IOException {
TestOutputCatcher catcher = new TestOutputCatcher();
RunLengthIntegerWriterV2 writer =
new RunLengthIntegerWriterV2(new OutStream("test",
new StreamOptions(10000), catcher), signed);
for(long x: input) {
writer.write(x);
}
writer.flush();
return catcher;
}
@Test
public void testShortRepeatExample() throws Exception {
long[] input = {10000, 10000, 10000, 10000, 10000};
TestOutputCatcher output = encodeV2(input, false);
output.compareBytes(0x0a, 0x27, 0x10);
}
@Test
public void testDirectExample() throws Exception {
long[] input = {23713, 43806, 57005, 48879};
TestOutputCatcher output = encodeV2(input, false);
output.compareBytes(0x5e, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe,
0xef);
}
@Test
public void testPatchedBaseExample() throws Exception {
long[] input = {2030, 2000, 2020, 1000000, 2040, 2050, 2060, 2070, 2080,
2090, 2100, 2110, 2120, 2130, 2140, 2150, 2160, 2170, 2180, 2190};
TestOutputCatcher output = encodeV2(input, false);
output.compareBytes(0x8e, 0x13, 0x2b, 0x21, 0x07, 0xd0, 0x1e, 0x00, 0x14,
0x70, 0x28, 0x32, 0x3c, 0x46, 0x50, 0x5a, 0x64, 0x6e, 0x78, 0x82, 0x8c,
0x96, 0xa0, 0xaa, 0xb4, 0xbe, 0xfc, 0xe8);
}
@Test
public void testDeltaExample() throws Exception {
long[] input = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29};
TestOutputCatcher output = encodeV2(input, false);
output.compareBytes(0xc6, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46);
}
@Test
public void testDelta2Example() throws Exception {
long[] input = {0, 10000, 10001, 10001, 10002, 10003, 10003};
TestOutputCatcher output = encodeV2(input, false);
output.compareBytes(0xc2, 0x06, 0x0, 0xa0, 0x9c, 0x01, 0x45, 0x0);
}
}