ORC-571 & ORC-563: Fix ArrayIndexOutOfBoundsException in StripePlanner.readRowIndex.
Fixes #452
Fixes #443
Signed-off-by: Owen O'Malley <omalley@apache.org>
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 72c7418..c4ce398 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -98,13 +98,10 @@
static int findColumns(SchemaEvolution evolution,
String columnName) {
try {
- final TypeDescription targetSchema;
- if (evolution.getPositionalColumns()) {
- targetSchema = evolution.getReaderBaseSchema();
- } else {
- targetSchema = evolution.getFileSchema();
- }
- return targetSchema.findSubtype(columnName).getId();
+ TypeDescription readerColumn =
+ evolution.getReaderBaseSchema().findSubtype(columnName);
+ TypeDescription fileColumn = evolution.getFileType(readerColumn);
+ return fileColumn == null ? -1 : fileColumn.getId();
} catch (IllegalArgumentException e) {
if (LOG.isDebugEnabled()){
LOG.debug("{}", e.getMessage());
@@ -230,7 +227,7 @@
reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
readerContext);
- int columns = schema.getMaximumId() + 1;
+ int columns = evolution.getFileSchema().getMaximumId() + 1;
indexes = new OrcIndex(new OrcProto.RowIndex[columns],
new OrcProto.Stream.Kind[columns],
new OrcProto.BloomFilterIndex[columns]);
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index d150750..83e4399 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -72,6 +72,7 @@
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.impl.RecordReaderImpl.Location;
+import org.apache.orc.impl.RecordReaderImpl.SargApplier;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -2222,6 +2223,49 @@
}
@Test
+ public void testPositionalEvolutionAddColumnPPD() throws IOException {
+ Reader.Options opts = new Reader.Options();
+ opts.forcePositionalEvolution(true);
+
+ TypeDescription file = TypeDescription.fromString("struct<x:int>");
+ // new column added on reader side
+ TypeDescription read = TypeDescription.fromString("struct<x:int,y:boolean>");
+ opts.include(includeAll(read));
+
+ SchemaEvolution evo = new SchemaEvolution(file, read, opts);
+
+ SearchArgument sarg = SearchArgumentFactory.newBuilder().startAnd()
+ .equals("y", PredicateLeaf.Type.BOOLEAN, true).end().build();
+
+ RecordReaderImpl.SargApplier applier =
+ new RecordReaderImpl.SargApplier(sarg, 1000, evo, OrcFile.WriterVersion.ORC_135, false);
+
+ OrcProto.StripeInformation stripe =
+ OrcProto.StripeInformation.newBuilder().setNumberOfRows(2000).build();
+
+ OrcProto.RowIndex[] indexes = new OrcProto.RowIndex[3];
+ indexes[1] = OrcProto.RowIndex.newBuilder() // index for original x column
+ .addEntry(createIndexEntry(0L, 10L))
+ .addEntry(createIndexEntry(100L, 200L))
+ .build();
+ indexes[2] = null; // no-op, just for clarifying that new reader column doesn't have an index
+
+ List<OrcProto.ColumnEncoding> encodings = new ArrayList<>();
+ encodings.add(OrcProto.ColumnEncoding.newBuilder().setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build());
+
+ boolean[] rows = applier.pickRowGroups(new ReaderImpl.StripeInformationImpl(stripe, 1, -1, null),
+ indexes, null, encodings, null, false);
+ assertEquals(SargApplier.READ_ALL_RGS, rows); //cannot filter for new column, return all rows
+ }
+
+ private boolean[] includeAll(TypeDescription readerType) {
+ int numColumns = readerType.getMaximumId() + 1;
+ boolean[] result = new boolean[numColumns];
+ Arrays.fill(result, true);
+ return result;
+ }
+
+ @Test
public void testSkipDataReaderOpen() throws Exception {
IOException ioe = new IOException("Don't open when there is no stripe");
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
index 8618677..7e907b7 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
@@ -22,6 +22,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.*;
@@ -32,6 +35,8 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
@@ -43,6 +48,7 @@
/**
* Test the behavior of ORC's schema evolution
*/
+@RunWith(Parameterized.class)
public class TestOrcFileEvolution {
// These utility methods are just to make writing tests easier. The values
@@ -76,6 +82,20 @@
FileSystem fs;
Path testFilePath;
+ private boolean addSarg;
+
+ public TestOrcFileEvolution(boolean addSarg) {
+ this.addSarg = addSarg;
+ }
+
+ @Parameterized.Parameters
+ public static Collection params() {
+ return Arrays.asList(new Object[][] {
+ { true },
+ { false }
+ });
+ }
+
@Rule
public TestName testCaseName = new TestName();
@@ -94,62 +114,86 @@
@Test
public void testAddFieldToEnd() {
checkEvolution("struct<a:int,b:string>", "struct<a:int,b:string,c:double>",
- struct(1, "foo"),
- struct(1, "foo", null));
+ struct(11, "foo"),
+ addSarg ? struct(0, "", 0.0) : struct(11, "foo", null),
+ addSarg);
+ }
+
+ @Test
+ public void testAddFieldToEndWithSarg() {
+ SearchArgument sArg = SearchArgumentFactory
+ .newBuilder()
+ .lessThan("c", PredicateLeaf.Type.LONG, 10L)
+ .build();
+ String[] sCols = new String[]{null, null, "c"};
+
+ checkEvolution("struct<a:int,b:string>", "struct<a:int,b:string,c:int>",
+ struct(1, "foo"),
+ struct(1, "foo", null),
+ (boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue(),
+ sArg, sCols, false);
}
@Test
public void testAddFieldBeforeEnd() {
checkEvolution("struct<a:int,b:string>", "struct<a:int,c:double,b:string>",
struct(1, "foo"),
- struct(1, null, "foo"));
+ struct(1, null, "foo"),
+ addSarg);
}
@Test
public void testRemoveLastField() {
checkEvolution("struct<a:int,b:string,c:double>", "struct<a:int,b:string>",
struct(1, "foo", 3.14),
- struct(1, "foo"));
+ struct(1, "foo"),
+ addSarg);
}
@Test
public void testRemoveFieldBeforeEnd() {
checkEvolution("struct<a:int,b:string,c:double>", "struct<a:int,c:double>",
struct(1, "foo", 3.14),
- struct(1, 3.14));
+ struct(1, 3.14),
+ addSarg);
}
@Test
public void testRemoveAndAddField() {
checkEvolution("struct<a:int,b:string>", "struct<a:int,c:double>",
- struct(1, "foo"), struct(1, null));
+ struct(1, "foo"), struct(1, null),
+ addSarg);
}
@Test
public void testReorderFields() {
checkEvolution("struct<a:int,b:string>", "struct<b:string,a:int>",
- struct(1, "foo"), struct("foo", 1));
+ struct(1, "foo"), struct("foo", 1),
+ addSarg);
}
@Test
public void testAddFieldEndOfStruct() {
checkEvolution("struct<a:struct<b:int>,c:string>",
"struct<a:struct<b:int,d:double>,c:string>",
- struct(struct(2), "foo"), struct(struct(2, null), "foo"));
+ struct(struct(2), "foo"), struct(struct(2, null), "foo"),
+ addSarg);
}
@Test
public void testAddFieldBeforeEndOfStruct() {
checkEvolution("struct<a:struct<b:int>,c:string>",
"struct<a:struct<d:double,b:int>,c:string>",
- struct(struct(2), "foo"), struct(struct(null, 2), "foo"));
+ struct(struct(2), "foo"), struct(struct(null, 2), "foo"),
+ addSarg);
}
@Test
public void testAddSimilarField() {
checkEvolution("struct<a:struct<b:int>>",
"struct<a:struct<b:int>,c:struct<b:int>>", struct(struct(2)),
- struct(struct(2), null));
+ struct(struct(2), null),
+ addSarg);
}
@Test
@@ -157,7 +201,8 @@
checkEvolution("struct<a:struct<a:int,b:string>,c:struct<a:int>>",
"struct<a:struct<a:int,b:string>,c:struct<a:int,b:string>>",
struct(struct(2, "foo"), struct(3)),
- struct(struct(2, "foo"), struct(3, null)));
+ struct(struct(2, "foo"), struct(3, null)),
+ addSarg);
}
@Test
@@ -165,7 +210,8 @@
checkEvolution("struct<a:map<struct<a:int>,int>>",
"struct<a:map<struct<a:int,b:string>,int>>",
struct(map(struct(1), 2)),
- struct(map(struct(1, null), 2)));
+ struct(map(struct(1, null), 2)),
+ addSarg);
}
@Test
@@ -173,7 +219,8 @@
checkEvolution("struct<a:map<int,struct<a:int>>>",
"struct<a:map<int,struct<a:int,b:string>>>",
struct(map(2, struct(1))),
- struct(map(2, struct(1, null))));
+ struct(map(2, struct(1, null))),
+ addSarg);
}
@Test
@@ -181,16 +228,17 @@
checkEvolution("struct<a:array<struct<b:int>>>",
"struct<a:array<struct<b:int,c:string>>>",
struct(list(struct(1), struct(2))),
- struct(list(struct(1, null), struct(2, null))));
+ struct(list(struct(1, null), struct(2, null))),
+ addSarg);
}
@Test
public void testPreHive4243CheckEqual() {
// Expect success on equal schemas
checkEvolution("struct<_col0:int,_col1:string>",
- "struct<_col0:int,_col1:string>",
- struct(1, "foo"),
- struct(1, "foo", null), false);
+ "struct<_col0:int,_col1:string>",
+ struct(1, "foo"),
+ struct(1, "foo", null), false, addSarg, false);
}
@Test
@@ -198,17 +246,17 @@
// Expect exception on strict compatibility check
thrown.expectMessage("HIVE-4243");
checkEvolution("struct<_col0:int,_col1:string>",
- "struct<_col0:int,_col1:string,_col2:double>",
- struct(1, "foo"),
- struct(1, "foo", null), false);
+ "struct<_col0:int,_col1:string,_col2:double>",
+ struct(1, "foo"),
+ struct(1, "foo", null), false, addSarg, false);
}
@Test
public void testPreHive4243AddColumn() {
checkEvolution("struct<_col0:int,_col1:string>",
- "struct<_col0:int,_col1:string,_col2:double>",
- struct(1, "foo"),
- struct(1, "foo", null), true);
+ "struct<_col0:int,_col1:string,_col2:double>",
+ struct(1, "foo"),
+ struct(1, "foo", null), true, addSarg, false);
}
@Test
@@ -216,17 +264,17 @@
// Expect exception on type mismatch
thrown.expect(SchemaEvolution.IllegalEvolutionException.class);
checkEvolution("struct<_col0:int,_col1:double>",
- "struct<_col0:int,_col1:date,_col2:double>",
- struct(1, 1.0),
- null, true);
+ "struct<_col0:int,_col1:date,_col2:double>",
+ struct(1, 1.0),
+ null, true, addSarg, false);
}
@Test
public void testPreHive4243AddColumnWithFix() {
checkEvolution("struct<_col0:int,_col1:string>",
- "struct<a:int,b:string,c:double>",
- struct(1, "foo"),
- struct(1, "foo", null), true);
+ "struct<a:int,b:string,c:double>",
+ struct(1, "foo"),
+ struct(1, "foo", null), true, addSarg, false);
}
@Test
@@ -239,15 +287,65 @@
null, true);
}
- private void checkEvolution(String writerType, String readerType,
- Object inputRow, Object expectedOutput) {
- checkEvolution(writerType, readerType,
- inputRow, expectedOutput,
- (boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue());
+ /**
+ * Test positional schema evolution.
+ * With the sarg, it eliminates the row and we don't get the row.
+ */
+ @Test
+ public void testPositional() {
+ checkEvolution("struct<x:int,y:int,z:int>", "struct<a:int,b:int,c:int,d:int>",
+ struct(11, 2, 3),
+ // if the sarg works, we get the default value
+ addSarg ? struct(0, 0, 0, 0) : struct(11, 2, 3, null),
+ false, addSarg, true);
+ }
+
+ /**
+ * Make the sarg try to use a column past the end of the file schema, since
+ * it will get null, the predicate doesn't hit.
+ */
+ @Test
+ public void testPositional2() {
+ checkEvolution("struct<x:int,y:int,z:int>", "struct<b:int,c:int,d:int,a:int>",
+ struct(11, 2, 3),
+ struct(11, 2, 3, null),
+ false, addSarg, true);
}
private void checkEvolution(String writerType, String readerType,
- Object inputRow, Object expectedOutput, boolean tolerateSchema) {
+ Object inputRow, Object expectedOutput,
+ boolean tolerateSchema, boolean addSarg,
+ boolean positional) {
+ SearchArgument sArg = null;
+ String[] sCols = null;
+ if (addSarg) {
+ sArg = SearchArgumentFactory
+ .newBuilder()
+ .lessThan("a", PredicateLeaf.Type.LONG, 10L)
+ .build();
+ sCols = new String[]{null, "a", null};
+ }
+
+ checkEvolution(writerType, readerType,
+ inputRow, expectedOutput,
+ tolerateSchema,
+ sArg, sCols, positional);
+ }
+
+ private void checkEvolution(String writerType, String readerType,
+ Object inputRow, Object expectedOutput,
+ boolean addSarg) {
+
+ checkEvolution(writerType, readerType,
+ inputRow, expectedOutput,
+ (boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue(),
+ addSarg, false);
+ }
+
+ private void checkEvolution(String writerType, String readerType,
+ Object inputRow, Object expectedOutput,
+ boolean tolerateSchema, SearchArgument sArg,
+ String[] sCols, boolean positional) {
TypeDescription readTypeDescr = TypeDescription.fromString(readerType);
TypeDescription writerTypeDescr = TypeDescription.fromString(writerType);
@@ -263,12 +361,19 @@
new OrcMapredRecordWriter<OrcStruct>(writer);
recordWriter.write(NullWritable.get(), inputStruct);
recordWriter.close(mock(Reporter.class));
+
Reader reader = OrcFile.createReader(testFilePath,
- OrcFile.readerOptions(conf).filesystem(fs));
+ OrcFile.readerOptions(conf).filesystem(fs));
+
+ Reader.Options options = reader.options().schema(readTypeDescr);
+ if (sArg != null && sCols != null) {
+ options.searchArgument(sArg, sCols);
+ }
+
OrcMapredRecordReader<OrcStruct> recordReader =
new OrcMapredRecordReader<>(reader,
- reader.options().schema(readTypeDescr)
- .tolerateMissingSchema(tolerateSchema));
+ options.tolerateMissingSchema(tolerateSchema)
+ .forcePositionalEvolution(positional));
OrcStruct result = recordReader.createValue();
recordReader.next(recordReader.createKey(), result);
assertEquals(expectedStruct, result);