ORC-379: ConversionTreeReaders should handle Decimal64

Fixes #284

Signed-off-by: Prasanth Jayachandran <prasanthj@apache.org>
diff --git a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
index eb731fa..b9fb48d 100644
--- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
@@ -1162,7 +1163,7 @@
     private AnyIntegerTreeReader anyIntegerAsLongTreeReader;
 
     private LongColumnVector longColVector;
-    private DecimalColumnVector decimalColVector;
+    private ColumnVector decimalColVector;
 
     DecimalFromAnyIntegerTreeReader(int columnId, TypeDescription fileType, Context context)
         throws IOException {
@@ -1177,7 +1178,11 @@
       long longValue = longColVector.vector[elementNum];
       HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(longValue);
       // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds.
-      decimalColVector.set(elementNum, hiveDecimalWritable);
+      if (decimalColVector instanceof Decimal64ColumnVector) {
+        ((Decimal64ColumnVector) decimalColVector).set(elementNum, hiveDecimalWritable);
+      } else {
+        ((DecimalColumnVector) decimalColVector).set(elementNum, hiveDecimalWritable);
+      }
     }
 
     @Override
@@ -1187,7 +1192,7 @@
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
-        decimalColVector = (DecimalColumnVector) previousVector;
+        decimalColVector = previousVector;
       }
       // Read present/isNull stream
       anyIntegerAsLongTreeReader.nextVector(longColVector, isNull, batchSize);
@@ -1201,7 +1206,7 @@
     private FloatTreeReader floatTreeReader;
 
     private DoubleColumnVector doubleColVector;
-    private DecimalColumnVector decimalColVector;
+    private ColumnVector decimalColVector;
 
     DecimalFromFloatTreeReader(int columnId, TypeDescription readerType)
         throws IOException {
@@ -1218,7 +1223,11 @@
             HiveDecimal.create(Float.toString(floatValue));
         if (decimalValue != null) {
           // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds.
-          decimalColVector.set(elementNum, decimalValue);
+          if (decimalColVector instanceof Decimal64ColumnVector) {
+            ((Decimal64ColumnVector) decimalColVector).set(elementNum, decimalValue);
+          } else {
+            ((DecimalColumnVector) decimalColVector).set(elementNum, decimalValue);
+          }
         } else {
           decimalColVector.noNulls = false;
           decimalColVector.isNull[elementNum] = true;
@@ -1236,7 +1245,7 @@
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
-        decimalColVector = (DecimalColumnVector) previousVector;
+        decimalColVector = previousVector;
       }
       // Read present/isNull stream
       floatTreeReader.nextVector(doubleColVector, isNull, batchSize);
@@ -1250,7 +1259,7 @@
     private DoubleTreeReader doubleTreeReader;
 
     private DoubleColumnVector doubleColVector;
-    private DecimalColumnVector decimalColVector;
+    private ColumnVector decimalColVector;
 
     DecimalFromDoubleTreeReader(int columnId, TypeDescription readerType)
         throws IOException {
@@ -1264,7 +1273,11 @@
       HiveDecimal value =
           HiveDecimal.create(Double.toString(doubleColVector.vector[elementNum]));
       if (value != null) {
-        decimalColVector.set(elementNum, value);
+        if (decimalColVector instanceof Decimal64ColumnVector) {
+          ((Decimal64ColumnVector) decimalColVector).set(elementNum, value);
+        } else {
+          ((DecimalColumnVector) decimalColVector).set(elementNum, value);
+        }
       } else {
         decimalColVector.noNulls = false;
         decimalColVector.isNull[elementNum] = true;
@@ -1278,7 +1291,7 @@
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
-        decimalColVector = (DecimalColumnVector) previousVector;
+        decimalColVector = previousVector;
       }
       // Read present/isNull stream
       doubleTreeReader.nextVector(doubleColVector, isNull, batchSize);
@@ -1292,7 +1305,7 @@
     private TreeReader stringGroupTreeReader;
 
     private BytesColumnVector bytesColVector;
-    private DecimalColumnVector decimalColVector;
+    private ColumnVector decimalColVector;
 
     DecimalFromStringGroupTreeReader(int columnId, TypeDescription fileType,
         TypeDescription readerType, Context context) throws IOException {
@@ -1307,7 +1320,11 @@
       HiveDecimal value = parseDecimalFromString(string);
       if (value != null) {
         // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds.
-        decimalColVector.set(elementNum, value);
+        if (decimalColVector instanceof Decimal64ColumnVector) {
+          ((Decimal64ColumnVector) decimalColVector).set(elementNum, value);
+        } else {
+          ((DecimalColumnVector) decimalColVector).set(elementNum, value);
+        }
       } else {
         decimalColVector.noNulls = false;
         decimalColVector.isNull[elementNum] = true;
@@ -1321,7 +1338,7 @@
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
-        decimalColVector = (DecimalColumnVector) previousVector;
+        decimalColVector = previousVector;
       }
       // Read present/isNull stream
       stringGroupTreeReader.nextVector(bytesColVector, isNull, batchSize);
@@ -1335,7 +1352,7 @@
     private TimestampTreeReader timestampTreeReader;
 
     private TimestampColumnVector timestampColVector;
-    private DecimalColumnVector decimalColVector;
+    private ColumnVector decimalColVector;
 
     DecimalFromTimestampTreeReader(int columnId, Context context) throws IOException {
       super(columnId);
@@ -1350,7 +1367,11 @@
       HiveDecimal value = HiveDecimal.create(Double.toString(doubleValue));
       if (value != null) {
         // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds.
-        decimalColVector.set(elementNum, value);
+        if (decimalColVector instanceof Decimal64ColumnVector) {
+          ((Decimal64ColumnVector) decimalColVector).set(elementNum, value);
+        } else {
+          ((DecimalColumnVector) decimalColVector).set(elementNum, value);
+        }
       } else {
         decimalColVector.noNulls = false;
         decimalColVector.isNull[elementNum] = true;
@@ -1364,7 +1385,7 @@
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
-        decimalColVector = (DecimalColumnVector) previousVector;
+        decimalColVector = previousVector;
       }
       // Read present/isNull stream
       timestampTreeReader.nextVector(timestampColVector, isNull, batchSize);
@@ -1380,7 +1401,7 @@
     private DecimalColumnVector fileDecimalColVector;
     private int filePrecision;
     private int fileScale;
-    private DecimalColumnVector decimalColVector;
+    private ColumnVector decimalColVector;
 
     DecimalFromDecimalTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context)
         throws IOException {
@@ -1394,7 +1415,11 @@
     @Override
     public void setConvertVectorElement(int elementNum) throws IOException {
 
-      decimalColVector.set(elementNum, fileDecimalColVector.vector[elementNum]);
+      if (decimalColVector instanceof Decimal64ColumnVector) {
+        ((Decimal64ColumnVector) decimalColVector).set(elementNum, fileDecimalColVector.vector[elementNum]);
+      } else {
+        ((DecimalColumnVector) decimalColVector).set(elementNum, fileDecimalColVector.vector[elementNum]);
+      }
 
     }
 
@@ -1405,7 +1430,7 @@
       if (fileDecimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         fileDecimalColVector = new DecimalColumnVector(filePrecision, fileScale);
-        decimalColVector = (DecimalColumnVector) previousVector;
+        decimalColVector = previousVector;
       }
       // Read present/isNull stream
       decimalTreeReader.nextVector(fileDecimalColVector, isNull, batchSize);
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
index 510e00c..e452e57 100644
--- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -36,9 +36,13 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.Reader;
@@ -182,6 +186,332 @@
   }
 
   @Test
+  public void testFloatToDecimalEvolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createFloat();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    DoubleColumnVector dcv = new DoubleColumnVector(1024);
+    batch.cols[0] = dcv;
+    batch.reset();
+    batch.size = 1;
+    dcv.vector[0] = 74.72f;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(38).withScale(2);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals("74.72", ((DecimalColumnVector) batch.cols[0]).vector[0].toString());
+    rows.close();
+  }
+
+  @Test
+  public void testFloatToDecimal64Evolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createFloat();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    DoubleColumnVector dcv = new DoubleColumnVector(1024);
+    batch.cols[0] = dcv;
+    batch.reset();
+    batch.size = 1;
+    dcv.vector[0] = 74.72f;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(10).withScale(2);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatchV2();
+    rows.nextBatch(batch);
+    assertEquals("74.72", ((Decimal64ColumnVector) batch.cols[0]).getScratchWritable().toString());
+    rows.close();
+  }
+
+  @Test
+  public void testDoubleToDecimalEvolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createDouble();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    DoubleColumnVector dcv = new DoubleColumnVector(1024);
+    batch.cols[0] = dcv;
+    batch.reset();
+    batch.size = 1;
+    dcv.vector[0] = 74.72d;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(38).withScale(2);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals("74.72", ((DecimalColumnVector) batch.cols[0]).vector[0].toString());
+    rows.close();
+  }
+
+  @Test
+  public void testDoubleToDecimal64Evolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createDouble();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    DoubleColumnVector dcv = new DoubleColumnVector(1024);
+    batch.cols[0] = dcv;
+    batch.reset();
+    batch.size = 1;
+    dcv.vector[0] = 74.72d;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(10).withScale(2);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatchV2();
+    rows.nextBatch(batch);
+    assertEquals("74.72", ((Decimal64ColumnVector) batch.cols[0]).getScratchWritable().toString());
+    rows.close();
+  }
+
+  @Test
+  public void testLongToDecimalEvolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createLong();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    LongColumnVector lcv = new LongColumnVector(1024);
+    batch.cols[0] = lcv;
+    batch.reset();
+    batch.size = 1;
+    lcv.vector[0] = 74L;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(38).withScale(2);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals("74", ((DecimalColumnVector) batch.cols[0]).vector[0].toString());
+    rows.close();
+  }
+
+  @Test
+  public void testLongToDecimal64Evolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createLong();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    LongColumnVector lcv = new LongColumnVector(1024);
+    batch.cols[0] = lcv;
+    batch.reset();
+    batch.size = 1;
+    lcv.vector[0] = 74L;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(10).withScale(2);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatchV2();
+    rows.nextBatch(batch);
+    assertEquals("74", ((Decimal64ColumnVector) batch.cols[0]).getScratchWritable().toString());
+    rows.close();
+  }
+
+  @Test
+  public void testDecimalToDecimalEvolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createDecimal().withPrecision(38).withScale(0);
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    DecimalColumnVector dcv = new DecimalColumnVector(1024, 38, 2);
+    batch.cols[0] = dcv;
+    batch.reset();
+    batch.size = 1;
+    dcv.vector[0] = new HiveDecimalWritable("74.19");
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(38).withScale(1);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals("74.2", ((DecimalColumnVector) batch.cols[0]).vector[0].toString());
+    rows.close();
+  }
+
+  @Test
+  public void testDecimalToDecimal64Evolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createDecimal().withPrecision(38).withScale(2);
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    DecimalColumnVector dcv = new DecimalColumnVector(1024, 38, 0);
+    batch.cols[0] = dcv;
+    batch.reset();
+    batch.size = 1;
+    dcv.vector[0] = new HiveDecimalWritable("74.19");
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(10).withScale(1);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatchV2();
+    rows.nextBatch(batch);
+    assertEquals(742, ((Decimal64ColumnVector) batch.cols[0]).vector[0]);
+    rows.close();
+  }
+
+  @Test
+  public void testStringToDecimalEvolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createString();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    BytesColumnVector bcv = new BytesColumnVector(1024);
+    batch.cols[0] = bcv;
+    batch.reset();
+    batch.size = 1;
+    bcv.vector[0] = "74.19".getBytes();
+    bcv.length[0] = "74.19".getBytes().length;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(38).withScale(1);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals("74.2", ((DecimalColumnVector) batch.cols[0]).vector[0].toString());
+    rows.close();
+  }
+
+  @Test
+  public void testStringToDecimal64Evolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createString();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    BytesColumnVector bcv = new BytesColumnVector(1024);
+    batch.cols[0] = bcv;
+    batch.reset();
+    batch.size = 1;
+    bcv.vector[0] = "74.19".getBytes();
+    bcv.length[0] = "74.19".getBytes().length;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(10).withScale(1);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatchV2();
+    rows.nextBatch(batch);
+    assertEquals(742, ((Decimal64ColumnVector) batch.cols[0]).vector[0]);
+    rows.close();
+  }
+
+  @Test
+  public void testTimestampToDecimalEvolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createTimestamp();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    TimestampColumnVector tcv = new TimestampColumnVector(1024);
+    batch.cols[0] = tcv;
+    batch.reset();
+    batch.size = 1;
+    tcv.time[0] = 74000L;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(38).withScale(1);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals("74", ((DecimalColumnVector) batch.cols[0]).vector[0].toString());
+    rows.close();
+  }
+
+  @Test
+  public void testTimestampToDecimal64Evolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+      testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createTimestamp();
+    Writer writer = OrcFile.createWriter(testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+        .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    TimestampColumnVector tcv = new TimestampColumnVector(1024);
+    batch.cols[0] = tcv;
+    batch.reset();
+    batch.size = 1;
+    tcv.time[0] = 74000L;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+      OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDecimal().withPrecision(10).withScale(1);
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatchV2();
+    rows.nextBatch(batch);
+    assertEquals(740, ((Decimal64ColumnVector) batch.cols[0]).vector[0]);
+    rows.close();
+  }
+
+  @Test
   public void testSafePpdEvaluation() throws IOException {
     TypeDescription fileStruct1 = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt())