blob: ec7d547b15532b8863d303b82482145cb96fd995 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.impl.OrcIndex;
import org.apache.orc.impl.OutStream;
import org.apache.orc.impl.RecordReaderImpl;
import org.apache.orc.impl.StreamName;
import org.apache.orc.impl.TestInStream;
import org.apache.orc.impl.writer.StreamOptions;
import org.apache.orc.impl.writer.StringTreeWriter;
import org.apache.orc.impl.writer.TreeWriter;
import org.apache.orc.impl.writer.WriterContext;
import org.apache.orc.impl.writer.WriterEncryptionVariant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class TestStringDictionary {
private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+ File.separator + "tmp"));
private Configuration conf;
private FileSystem fs;
private Path testFilePath;
@Rule
public TestName testCaseName = new TestName();
@Before
public void openFileSystem() throws Exception {
conf = new Configuration();
fs = FileSystem.getLocal(conf);
testFilePath = new Path(workDir, "TestStringDictionary." + testCaseName.getMethodName() + ".orc");
fs.delete(testFilePath, false);
}
@Test
public void testTooManyDistinct() throws Exception {
TypeDescription schema = TypeDescription.createString();
Writer writer = OrcFile.createWriter(
testFilePath,
OrcFile.writerOptions(conf).setSchema(schema)
.compress(CompressionKind.NONE)
.bufferSize(10000));
VectorizedRowBatch batch = schema.createRowBatch();
BytesColumnVector col = (BytesColumnVector) batch.cols[0];
for (int i = 0; i < 20000; i++) {
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
col.setVal(batch.size++, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
writer.addRowBatch(batch);
writer.close();
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
batch = reader.getSchema().createRowBatch();
col = (BytesColumnVector) batch.cols[0];
int idx = 0;
while (rows.nextBatch(batch)) {
for(int r=0; r < batch.size; ++r) {
assertEquals(String.valueOf(idx++), col.toString(r));
}
}
// make sure the encoding type is correct
for (StripeInformation stripe : reader.getStripes()) {
// hacky but does the job, this casting will work as long this test resides
// within the same package as ORC reader
OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe);
for (int i = 0; i < footer.getColumnsCount(); ++i) {
OrcProto.ColumnEncoding encoding = footer.getColumns(i);
assertEquals(OrcProto.ColumnEncoding.Kind.DIRECT_V2, encoding.getKind());
}
}
}
@Test
public void testHalfDistinct() throws Exception {
TypeDescription schema = TypeDescription.createString();
Writer writer = OrcFile.createWriter(
testFilePath,
OrcFile.writerOptions(conf).setSchema(schema).compress(CompressionKind.NONE)
.bufferSize(10000));
Random rand = new Random(123);
int[] input = new int[20000];
for (int i = 0; i < 20000; i++) {
input[i] = rand.nextInt(10000);
}
VectorizedRowBatch batch = schema.createRowBatch();
BytesColumnVector col = (BytesColumnVector) batch.cols[0];
for (int i = 0; i < 20000; i++) {
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
col.setVal(batch.size++, String.valueOf(input[i]).getBytes(StandardCharsets.UTF_8));
}
writer.addRowBatch(batch);
writer.close();
Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
batch = reader.getSchema().createRowBatch();
col = (BytesColumnVector) batch.cols[0];
int idx = 0;
while (rows.nextBatch(batch)) {
for(int r=0; r < batch.size; ++r) {
assertEquals(String.valueOf(input[idx++]), col.toString(r));
}
}
// make sure the encoding type is correct
for (StripeInformation stripe : reader.getStripes()) {
// hacky but does the job, this casting will work as long this test resides
// within the same package as ORC reader
OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe);
for (int i = 0; i < footer.getColumnsCount(); ++i) {
OrcProto.ColumnEncoding encoding = footer.getColumns(i);
assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, encoding.getKind());
}
}
}
static class WriterContextImpl implements WriterContext {
private final TypeDescription schema;
private final Configuration conf;
private final Map<StreamName, TestInStream.OutputCollector> streams =
new HashMap<>();
WriterContextImpl(TypeDescription schema, Configuration conf) {
this.schema = schema;
this.conf = conf;
}
@Override
public OutStream createStream(StreamName name) {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
streams.put(name, collect);
return new OutStream("test", new StreamOptions(1000), collect);
}
@Override
public int getRowIndexStride() {
return 10000;
}
@Override
public boolean buildIndex() {
return OrcConf.ENABLE_INDEXES.getBoolean(conf);
}
@Override
public boolean isCompressed() {
return false;
}
@Override
public OrcFile.EncodingStrategy getEncodingStrategy() {
return OrcFile.EncodingStrategy.SPEED;
}
@Override
public boolean[] getBloomFilterColumns() {
return new boolean[schema.getMaximumId() + 1];
}
@Override
public double getBloomFilterFPP() {
return 0;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public OrcFile.Version getVersion() {
return OrcFile.Version.V_0_12;
}
@Override
public PhysicalWriter getPhysicalWriter() {
return null;
}
@Override
public void setEncoding(int column, WriterEncryptionVariant variant, OrcProto.ColumnEncoding encoding) {
}
@Override
public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder stats) {
}
@Override
public OrcFile.BloomFilterVersion getBloomFilterVersion() {
return OrcFile.BloomFilterVersion.UTF8;
}
@Override
public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) {
}
@Override
public void writeBloomFilter(StreamName name,
OrcProto.BloomFilterIndex.Builder bloom) {
}
@Override
public DataMask getUnencryptedMask(int columnId) {
return null;
}
@Override
public WriterEncryptionVariant getEncryption(int columnId) {
return null;
}
@Override
public boolean getUseUTCTimestamp() {
return true;
}
@Override
public double getDictionaryKeySizeThreshold(int column) {
return OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
}
@Override
public boolean getProlepticGregorian() {
return false;
}
}
@Test
public void testNonDistinctDisabled() throws Exception {
TypeDescription schema = TypeDescription.createString();
conf.set(OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute(), "0.0");
WriterContextImpl writerContext = new WriterContextImpl(schema, conf);
StringTreeWriter writer = (StringTreeWriter)
TreeWriter.Factory.create(schema, null, writerContext);
VectorizedRowBatch batch = schema.createRowBatch();
BytesColumnVector col = (BytesColumnVector) batch.cols[0];
batch.size = 1024;
col.isRepeating = true;
col.setVal(0, "foobar".getBytes(StandardCharsets.UTF_8));
writer.writeBatch(col, 0, batch.size);
TestInStream.OutputCollector output = writerContext.streams.get(
new StreamName(0, OrcProto.Stream.Kind.DATA));
// Check to make sure that the strings are being written to the stream,
// even before we get to the first rowGroup. (6 * 1024 / 1000 * 1000)
assertEquals(6000, output.buffer.size());
}
@Test
public void testTooManyDistinctCheckDisabled() throws Exception {
TypeDescription schema = TypeDescription.createString();
conf.setBoolean(OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getAttribute(), false);
Writer writer = OrcFile.createWriter(
testFilePath,
OrcFile.writerOptions(conf).setSchema(schema).compress(CompressionKind.NONE)
.bufferSize(10000));
VectorizedRowBatch batch = schema.createRowBatch();
BytesColumnVector string = (BytesColumnVector) batch.cols[0];
for (int i = 0; i < 20000; i++) {
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
string.setVal(batch.size++, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
writer.addRowBatch(batch);
writer.close();
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
batch = reader.getSchema().createRowBatch();
string = (BytesColumnVector) batch.cols[0];
int idx = 0;
while (rows.nextBatch(batch)) {
for(int r=0; r < batch.size; ++r) {
assertEquals(String.valueOf(idx++), string.toString(r));
}
}
// make sure the encoding type is correct
for (StripeInformation stripe : reader.getStripes()) {
// hacky but does the job, this casting will work as long this test resides
// within the same package as ORC reader
OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe);
for (int i = 0; i < footer.getColumnsCount(); ++i) {
OrcProto.ColumnEncoding encoding = footer.getColumns(i);
assertEquals(OrcProto.ColumnEncoding.Kind.DIRECT_V2, encoding.getKind());
}
}
}
@Test
public void testHalfDistinctCheckDisabled() throws Exception {
TypeDescription schema = TypeDescription.createString();
conf.setBoolean(OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getAttribute(),
false);
Writer writer = OrcFile.createWriter(
testFilePath,
OrcFile.writerOptions(conf).setSchema(schema)
.compress(CompressionKind.NONE)
.bufferSize(10000));
Random rand = new Random(123);
int[] input = new int[20000];
for (int i = 0; i < 20000; i++) {
input[i] = rand.nextInt(10000);
}
VectorizedRowBatch batch = schema.createRowBatch();
BytesColumnVector string = (BytesColumnVector) batch.cols[0];
for (int i = 0; i < 20000; i++) {
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
string.setVal(batch.size++, String.valueOf(input[i]).getBytes(StandardCharsets.UTF_8));
}
writer.addRowBatch(batch);
writer.close();
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
batch = reader.getSchema().createRowBatch();
string = (BytesColumnVector) batch.cols[0];
int idx = 0;
while (rows.nextBatch(batch)) {
for(int r=0; r < batch.size; ++r) {
assertEquals(String.valueOf(input[idx++]), string.toString(r));
}
}
// make sure the encoding type is correct
for (StripeInformation stripe : reader.getStripes()) {
// hacky but does the job, this casting will work as long this test resides
// within the same package as ORC reader
OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe);
for (int i = 0; i < footer.getColumnsCount(); ++i) {
OrcProto.ColumnEncoding encoding = footer.getColumns(i);
assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, encoding.getKind());
}
}
}
@Test
public void testTooManyDistinctV11AlwaysDictionary() throws Exception {
TypeDescription schema = TypeDescription.createString();
Writer writer = OrcFile.createWriter(
testFilePath,
OrcFile.writerOptions(conf).setSchema(schema)
.compress(CompressionKind.NONE)
.version(OrcFile.Version.V_0_11).bufferSize(10000));
VectorizedRowBatch batch = schema.createRowBatch();
BytesColumnVector string = (BytesColumnVector) batch.cols[0];
for (int i = 0; i < 20000; i++) {
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
string.setVal(batch.size++, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
writer.addRowBatch(batch);
writer.close();
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
batch = reader.getSchema().createRowBatch();
string = (BytesColumnVector) batch.cols[0];
RecordReader rows = reader.rows();
int idx = 0;
while (rows.nextBatch(batch)) {
for(int r=0; r < batch.size; ++r) {
assertEquals(String.valueOf(idx++), string.toString(r));
}
}
// make sure the encoding type is correct
for (StripeInformation stripe : reader.getStripes()) {
// hacky but does the job, this casting will work as long this test resides
// within the same package as ORC reader
OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe);
for (int i = 0; i < footer.getColumnsCount(); ++i) {
OrcProto.ColumnEncoding encoding = footer.getColumns(i);
assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY, encoding.getKind());
}
}
}
/**
* Test that dictionaries can be disabled, per column. In this test, we want to disable DICTIONARY_V2 for the
* `longString` column (presumably for a low hit-ratio), while preserving DICTIONARY_V2 for `shortString`.
* @throws Exception on unexpected failure
*/
@Test
public void testDisableDictionaryForSpecificColumn() throws Exception {
final String SHORT_STRING_VALUE = "foo";
final String LONG_STRING_VALUE = "BAAAAAAAAR!!";
TypeDescription schema =
TypeDescription.fromString("struct<shortString:string,longString:string>");
Writer writer = OrcFile.createWriter(
testFilePath,
OrcFile.writerOptions(conf).setSchema(schema)
.compress(CompressionKind.NONE)
.bufferSize(10000)
.directEncodingColumns("longString"));
VectorizedRowBatch batch = schema.createRowBatch();
BytesColumnVector shortStringColumnVector = (BytesColumnVector) batch.cols[0];
BytesColumnVector longStringColumnVector = (BytesColumnVector) batch.cols[1];
for (int i = 0; i < 20000; i++) {
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
shortStringColumnVector.setVal(batch.size, SHORT_STRING_VALUE.getBytes(StandardCharsets.UTF_8));
longStringColumnVector.setVal( batch.size, LONG_STRING_VALUE.getBytes(StandardCharsets.UTF_8));
++batch.size;
}
writer.addRowBatch(batch);
writer.close();
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
RecordReader recordReader = reader.rows();
batch = reader.getSchema().createRowBatch();
shortStringColumnVector = (BytesColumnVector) batch.cols[0];
longStringColumnVector = (BytesColumnVector) batch.cols[1];
while (recordReader.nextBatch(batch)) {
for(int r=0; r < batch.size; ++r) {
assertEquals(SHORT_STRING_VALUE, shortStringColumnVector.toString(r));
assertEquals(LONG_STRING_VALUE, longStringColumnVector.toString(r));
}
}
// make sure the encoding type is correct
for (StripeInformation stripe : reader.getStripes()) {
// hacky but does the job, this casting will work as long this test resides
// within the same package as ORC reader
OrcProto.StripeFooter footer = ((RecordReaderImpl) recordReader).readStripeFooter(stripe);
for (int i = 0; i < footer.getColumnsCount(); ++i) {
Assert.assertEquals(
"Expected 3 columns in the footer: One for the Orc Struct, and two for its members.",
3, footer.getColumnsCount());
Assert.assertEquals(
"The ORC schema struct should be DIRECT encoded.",
OrcProto.ColumnEncoding.Kind.DIRECT, footer.getColumns(0).getKind()
);
Assert.assertEquals(
"The shortString column must be DICTIONARY_V2 encoded",
OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, footer.getColumns(1).getKind()
);
Assert.assertEquals(
"The longString column must be DIRECT_V2 encoded",
OrcProto.ColumnEncoding.Kind.DIRECT_V2, footer.getColumns(2).getKind()
);
}
}
}
@Test
public void testForcedNonDictionary() throws Exception {
// Set the row stride to 16k so that it is a multiple of the batch size
final int INDEX_STRIDE = 16 * 1024;
final int NUM_BATCHES = 50;
// Explicitly turn off dictionary encoding.
OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.setDouble(conf, 0);
TypeDescription schema = TypeDescription.fromString("struct<str:string>");
try (Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
.rowIndexStride(INDEX_STRIDE))) {
// Write 50 batches where each batch has a single value for str.
VectorizedRowBatch batch = schema.createRowBatchV2();
BytesColumnVector col = (BytesColumnVector) batch.cols[0];
for(int b=0; b < NUM_BATCHES; ++b) {
batch.reset();
batch.size = 1024;
col.setVal(0, ("Value for " + b).getBytes(StandardCharsets.UTF_8));
col.isRepeating = true;
writer.addRowBatch(batch);
}
}
try (Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf));
RecordReaderImpl rows = (RecordReaderImpl) reader.rows()) {
VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
BytesColumnVector col = (BytesColumnVector) batch.cols[0];
// Get the index for the str column
OrcProto.RowIndex index = rows.readRowIndex(0, null, null)
.getRowGroupIndex()[1];
// We assume that it fits in a single stripe
assertEquals(1, reader.getStripes().size());
// There are 4 entries, because ceil(NUM_BATCHES * 1024 / INDEX_STRIDE) = 4.
assertEquals(4, index.getEntryCount());
for(int e=0; e < index.getEntryCount(); ++e) {
OrcProto.RowIndexEntry entry = index.getEntry(e);
// For a string column with direct encoding, compression & no nulls, we
// should have 5 positions in each entry.
assertEquals("position count entry " + e, 5, entry.getPositionsCount());
// make sure we can seek and get the right data
int row = e * INDEX_STRIDE;
rows.seekToRow(row);
assertTrue("entry " + e, rows.nextBatch(batch));
assertEquals("entry " + e, 1024, batch.size);
assertEquals("entry " + e, true, col.noNulls);
assertEquals("entry " + e, "Value for " + (row / 1024), col.toString(0));
}
}
}
}