ORC-571 & ORC-563: Fix ArrayIndexOutOfBoundsException in StripePlanner.readRowIndex.

Fixes #452
Fixes #443

Signed-off-by: Owen O'Malley <omalley@apache.org>
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 72c7418..c4ce398 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -98,13 +98,10 @@
   static int findColumns(SchemaEvolution evolution,
                          String columnName) {
     try {
-      final TypeDescription targetSchema;
-      if (evolution.getPositionalColumns()) {
-        targetSchema = evolution.getReaderBaseSchema();
-      } else {
-        targetSchema = evolution.getFileSchema();
-      }
-      return targetSchema.findSubtype(columnName).getId();
+      TypeDescription readerColumn =
+          evolution.getReaderBaseSchema().findSubtype(columnName);
+      TypeDescription fileColumn = evolution.getFileType(readerColumn);
+      return fileColumn == null ? -1 : fileColumn.getId();
     } catch (IllegalArgumentException e) {
       if (LOG.isDebugEnabled()){
         LOG.debug("{}", e.getMessage());
@@ -230,7 +227,7 @@
     reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
         readerContext);
 
-    int columns = schema.getMaximumId() + 1;
+    int columns = evolution.getFileSchema().getMaximumId() + 1;
     indexes = new OrcIndex(new OrcProto.RowIndex[columns],
         new OrcProto.Stream.Kind[columns],
         new OrcProto.BloomFilterIndex[columns]);
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index d150750..83e4399 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -72,6 +72,7 @@
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 import org.apache.orc.impl.RecordReaderImpl.Location;
+import org.apache.orc.impl.RecordReaderImpl.SargApplier;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -2222,6 +2223,49 @@
   }
 
   @Test
+  public void testPositionalEvolutionAddColumnPPD() throws IOException {
+    Reader.Options opts = new Reader.Options();
+    opts.forcePositionalEvolution(true);
+
+    TypeDescription file = TypeDescription.fromString("struct<x:int>");
+    // new column added on reader side
+    TypeDescription read = TypeDescription.fromString("struct<x:int,y:boolean>");
+    opts.include(includeAll(read));
+
+    SchemaEvolution evo = new SchemaEvolution(file, read, opts);
+
+    SearchArgument sarg = SearchArgumentFactory.newBuilder().startAnd()
+        .equals("y", PredicateLeaf.Type.BOOLEAN, true).end().build();
+
+    RecordReaderImpl.SargApplier applier =
+        new RecordReaderImpl.SargApplier(sarg, 1000, evo, OrcFile.WriterVersion.ORC_135, false);
+
+    OrcProto.StripeInformation stripe =
+        OrcProto.StripeInformation.newBuilder().setNumberOfRows(2000).build();
+
+    OrcProto.RowIndex[] indexes = new OrcProto.RowIndex[3];
+    indexes[1] = OrcProto.RowIndex.newBuilder() // index for original x column
+        .addEntry(createIndexEntry(0L, 10L))
+        .addEntry(createIndexEntry(100L, 200L))
+        .build();
+    indexes[2] = null; // no-op, just for clarifying that new reader column doesn't have an index
+
+    List<OrcProto.ColumnEncoding> encodings = new ArrayList<>();
+    encodings.add(OrcProto.ColumnEncoding.newBuilder().setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build());
+
+    boolean[] rows = applier.pickRowGroups(new ReaderImpl.StripeInformationImpl(stripe,  1, -1, null),
+        indexes, null, encodings, null, false);
+    assertEquals(SargApplier.READ_ALL_RGS, rows); //cannot filter for new column, return all rows
+  }
+
+  private boolean[] includeAll(TypeDescription readerType) {
+    int numColumns = readerType.getMaximumId() + 1;
+    boolean[] result = new boolean[numColumns];
+    Arrays.fill(result, true);
+    return result;
+  }
+
+  @Test
   public void testSkipDataReaderOpen() throws Exception {
     IOException ioe = new IOException("Don't open when there is no stripe");
 
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
index 8618677..7e907b7 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
@@ -22,6 +22,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.*;
@@ -32,6 +35,8 @@
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -43,6 +48,7 @@
 /**
  * Test the behavior of ORC's schema evolution
  */
+@RunWith(Parameterized.class)
 public class TestOrcFileEvolution {
 
   // These utility methods are just to make writing tests easier. The values
@@ -76,6 +82,20 @@
   FileSystem fs;
   Path testFilePath;
 
+  private boolean addSarg;
+
+  public TestOrcFileEvolution(boolean addSarg) {
+    this.addSarg = addSarg;
+  }
+
+  @Parameterized.Parameters
+  public static Collection params() {
+    return Arrays.asList(new Object[][] {
+      { true },
+      { false }
+    });
+  }
+
   @Rule
   public TestName testCaseName = new TestName();
 
@@ -94,62 +114,86 @@
   @Test
   public void testAddFieldToEnd() {
     checkEvolution("struct<a:int,b:string>", "struct<a:int,b:string,c:double>",
-        struct(1, "foo"),
-        struct(1, "foo", null));
+        struct(11, "foo"),
+        addSarg ? struct(0, "", 0.0) : struct(11, "foo", null),
+        addSarg);
+  }
+
+  @Test
+  public void testAddFieldToEndWithSarg() {
+    SearchArgument sArg = SearchArgumentFactory
+      .newBuilder()
+      .lessThan("c", PredicateLeaf.Type.LONG, 10L)
+      .build();
+    String[] sCols = new String[]{null, null, "c"};
+
+    checkEvolution("struct<a:int,b:string>", "struct<a:int,b:string,c:int>",
+                   struct(1, "foo"),
+                   struct(1, "foo", null),
+                   (boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue(),
+                   sArg, sCols, false);
   }
 
   @Test
   public void testAddFieldBeforeEnd() {
     checkEvolution("struct<a:int,b:string>", "struct<a:int,c:double,b:string>",
         struct(1, "foo"),
-        struct(1, null, "foo"));
+        struct(1, null, "foo"),
+                   addSarg);
   }
 
   @Test
   public void testRemoveLastField() {
     checkEvolution("struct<a:int,b:string,c:double>", "struct<a:int,b:string>",
         struct(1, "foo", 3.14),
-        struct(1, "foo"));
+        struct(1, "foo"),
+                   addSarg);
   }
 
   @Test
   public void testRemoveFieldBeforeEnd() {
     checkEvolution("struct<a:int,b:string,c:double>", "struct<a:int,c:double>",
         struct(1, "foo", 3.14),
-        struct(1, 3.14));
+        struct(1, 3.14),
+                   addSarg);
   }
 
   @Test
   public void testRemoveAndAddField() {
     checkEvolution("struct<a:int,b:string>", "struct<a:int,c:double>",
-        struct(1, "foo"), struct(1, null));
+        struct(1, "foo"), struct(1, null),
+                   addSarg);
   }
 
   @Test
   public void testReorderFields() {
     checkEvolution("struct<a:int,b:string>", "struct<b:string,a:int>",
-        struct(1, "foo"), struct("foo", 1));
+        struct(1, "foo"), struct("foo", 1),
+                   addSarg);
   }
 
   @Test
   public void testAddFieldEndOfStruct() {
     checkEvolution("struct<a:struct<b:int>,c:string>",
         "struct<a:struct<b:int,d:double>,c:string>",
-        struct(struct(2), "foo"), struct(struct(2, null), "foo"));
+        struct(struct(2), "foo"), struct(struct(2, null), "foo"),
+                   addSarg);
   }
 
   @Test
   public void testAddFieldBeforeEndOfStruct() {
     checkEvolution("struct<a:struct<b:int>,c:string>",
         "struct<a:struct<d:double,b:int>,c:string>",
-        struct(struct(2), "foo"), struct(struct(null, 2), "foo"));
+        struct(struct(2), "foo"), struct(struct(null, 2), "foo"),
+                   addSarg);
   }
 
   @Test
   public void testAddSimilarField() {
     checkEvolution("struct<a:struct<b:int>>",
         "struct<a:struct<b:int>,c:struct<b:int>>", struct(struct(2)),
-        struct(struct(2), null));
+        struct(struct(2), null),
+                   addSarg);
   }
 
   @Test
@@ -157,7 +201,8 @@
     checkEvolution("struct<a:struct<a:int,b:string>,c:struct<a:int>>",
         "struct<a:struct<a:int,b:string>,c:struct<a:int,b:string>>",
         struct(struct(2, "foo"), struct(3)),
-        struct(struct(2, "foo"), struct(3, null)));
+        struct(struct(2, "foo"), struct(3, null)),
+                   addSarg);
   }
 
   @Test
@@ -165,7 +210,8 @@
     checkEvolution("struct<a:map<struct<a:int>,int>>",
         "struct<a:map<struct<a:int,b:string>,int>>",
         struct(map(struct(1), 2)),
-        struct(map(struct(1, null), 2)));
+        struct(map(struct(1, null), 2)),
+                   addSarg);
   }
 
   @Test
@@ -173,7 +219,8 @@
     checkEvolution("struct<a:map<int,struct<a:int>>>",
         "struct<a:map<int,struct<a:int,b:string>>>",
         struct(map(2, struct(1))),
-        struct(map(2, struct(1, null))));
+        struct(map(2, struct(1, null))),
+                   addSarg);
   }
 
   @Test
@@ -181,16 +228,17 @@
     checkEvolution("struct<a:array<struct<b:int>>>",
         "struct<a:array<struct<b:int,c:string>>>",
         struct(list(struct(1), struct(2))),
-        struct(list(struct(1, null), struct(2, null))));
+        struct(list(struct(1, null), struct(2, null))),
+                   addSarg);
   }
 
   @Test
   public void testPreHive4243CheckEqual() {
     // Expect success on equal schemas
     checkEvolution("struct<_col0:int,_col1:string>",
-        "struct<_col0:int,_col1:string>",
-        struct(1, "foo"),
-        struct(1, "foo", null), false);
+                   "struct<_col0:int,_col1:string>",
+                   struct(1, "foo"),
+                   struct(1, "foo", null), false, addSarg, false);
   }
 
   @Test
@@ -198,17 +246,17 @@
     // Expect exception on strict compatibility check
     thrown.expectMessage("HIVE-4243");
     checkEvolution("struct<_col0:int,_col1:string>",
-        "struct<_col0:int,_col1:string,_col2:double>",
-        struct(1, "foo"),
-        struct(1, "foo", null), false);
+                   "struct<_col0:int,_col1:string,_col2:double>",
+                   struct(1, "foo"),
+                   struct(1, "foo", null), false, addSarg, false);
   }
 
   @Test
   public void testPreHive4243AddColumn() {
     checkEvolution("struct<_col0:int,_col1:string>",
-        "struct<_col0:int,_col1:string,_col2:double>",
-        struct(1, "foo"),
-        struct(1, "foo", null), true);
+                   "struct<_col0:int,_col1:string,_col2:double>",
+                   struct(1, "foo"),
+                   struct(1, "foo", null), true, addSarg, false);
   }
 
   @Test
@@ -216,17 +264,17 @@
     // Expect exception on type mismatch
     thrown.expect(SchemaEvolution.IllegalEvolutionException.class);
     checkEvolution("struct<_col0:int,_col1:double>",
-        "struct<_col0:int,_col1:date,_col2:double>",
-        struct(1, 1.0),
-        null, true);
+                   "struct<_col0:int,_col1:date,_col2:double>",
+                   struct(1, 1.0),
+                   null, true, addSarg, false);
   }
 
   @Test
   public void testPreHive4243AddColumnWithFix() {
     checkEvolution("struct<_col0:int,_col1:string>",
-        "struct<a:int,b:string,c:double>",
-        struct(1, "foo"),
-        struct(1, "foo", null), true);
+                   "struct<a:int,b:string,c:double>",
+                   struct(1, "foo"),
+                   struct(1, "foo", null), true, addSarg, false);
   }
 
   @Test
@@ -239,15 +287,65 @@
         null, true);
   }
 
-  private void checkEvolution(String writerType, String readerType,
-      Object inputRow, Object expectedOutput) {
-    checkEvolution(writerType, readerType,
-        inputRow, expectedOutput,
-        (boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue());
+  /**
+   * Test positional schema evolution.
+   * With the sarg, it eliminates the row and we don't get the row.
+   */
+  @Test
+  public void testPositional() {
+    checkEvolution("struct<x:int,y:int,z:int>", "struct<a:int,b:int,c:int,d:int>",
+        struct(11, 2, 3),
+        // if the sarg works, we get the default value
+        addSarg ? struct(0, 0, 0, 0) : struct(11, 2, 3, null),
+        false, addSarg, true);
+  }
+
+  /**
+   * Make the sarg try to use a column past the end of the file schema, since
+   * it will get null, the predicate doesn't hit.
+   */
+  @Test
+  public void testPositional2() {
+    checkEvolution("struct<x:int,y:int,z:int>", "struct<b:int,c:int,d:int,a:int>",
+        struct(11, 2, 3),
+        struct(11, 2, 3, null),
+        false, addSarg, true);
   }
 
   private void checkEvolution(String writerType, String readerType,
-      Object inputRow, Object expectedOutput, boolean tolerateSchema) {
+                              Object inputRow, Object expectedOutput,
+                              boolean tolerateSchema, boolean addSarg,
+                              boolean positional) {
+    SearchArgument sArg = null;
+    String[] sCols = null;
+    if (addSarg) {
+      sArg = SearchArgumentFactory
+        .newBuilder()
+        .lessThan("a", PredicateLeaf.Type.LONG, 10L)
+        .build();
+      sCols = new String[]{null, "a", null};
+    }
+
+    checkEvolution(writerType, readerType,
+                   inputRow, expectedOutput,
+                   tolerateSchema,
+                   sArg, sCols, positional);
+  }
+
+  private void checkEvolution(String writerType, String readerType,
+                              Object inputRow, Object expectedOutput,
+                              boolean addSarg) {
+
+    checkEvolution(writerType, readerType,
+      inputRow, expectedOutput,
+      (boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue(),
+        addSarg, false);
+  }
+
+  private void checkEvolution(String writerType, String readerType,
+                              Object inputRow, Object expectedOutput,
+                              boolean tolerateSchema, SearchArgument sArg,
+                              String[] sCols, boolean positional) {
     TypeDescription readTypeDescr = TypeDescription.fromString(readerType);
     TypeDescription writerTypeDescr = TypeDescription.fromString(writerType);
 
@@ -263,12 +361,19 @@
           new OrcMapredRecordWriter<OrcStruct>(writer);
       recordWriter.write(NullWritable.get(), inputStruct);
       recordWriter.close(mock(Reporter.class));
+
       Reader reader = OrcFile.createReader(testFilePath,
-          OrcFile.readerOptions(conf).filesystem(fs));
+                                           OrcFile.readerOptions(conf).filesystem(fs));
+
+      Reader.Options options = reader.options().schema(readTypeDescr);
+      if (sArg != null && sCols != null) {
+        options.searchArgument(sArg, sCols);
+      }
+
       OrcMapredRecordReader<OrcStruct> recordReader =
           new OrcMapredRecordReader<>(reader,
-              reader.options().schema(readTypeDescr)
-                  .tolerateMissingSchema(tolerateSchema));
+              options.tolerateMissingSchema(tolerateSchema)
+                  .forcePositionalEvolution(positional));
       OrcStruct result = recordReader.createValue();
       recordReader.next(recordReader.createKey(), result);
       assertEquals(expectedStruct, result);