PARQUET-1926: Add LogicalType support to ThriftType (#832)

diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index 83d6dd5..f915a6e 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -57,8 +57,6 @@
 import static org.apache.parquet.schema.ConversionPatterns.listOfElements;
 import static org.apache.parquet.schema.ConversionPatterns.listType;
 import static org.apache.parquet.schema.ConversionPatterns.mapType;
-import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
-import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
@@ -325,7 +323,7 @@
 
   @Override
   public ConvertedField visit(EnumType enumType, State state) {
-    return visitPrimitiveType(BINARY, enumType(), state);
+    return visitPrimitiveType(BINARY, LogicalTypeAnnotation.enumType(), state);
   }
 
   @Override
@@ -350,17 +348,21 @@
 
   @Override
   public ConvertedField visit(I32Type i32Type, State state) {
-    return visitPrimitiveType(INT32, state);
+    return i32Type.hasLogicalTypeAnnotation() ? visitPrimitiveType(INT32, i32Type.getLogicalTypeAnnotation(), state) : visitPrimitiveType(INT32, state);
   }
 
   @Override
   public ConvertedField visit(I64Type i64Type, State state) {
-    return visitPrimitiveType(INT64, state);
+    return i64Type.hasLogicalTypeAnnotation() ? visitPrimitiveType(INT64, i64Type.getLogicalTypeAnnotation(), state) : visitPrimitiveType(INT64, state);
   }
 
   @Override
   public ConvertedField visit(StringType stringType, State state) {
-    return stringType.isBinary() ? visitPrimitiveType(BINARY, state) : visitPrimitiveType(BINARY, stringType(), state);
+    if (stringType.isBinary()) {
+      return stringType.hasLogicalTypeAnnotation() ? visitPrimitiveType(BINARY, stringType.getLogicalTypeAnnotation(), state) : visitPrimitiveType(BINARY, state);
+    } else {
+      return visitPrimitiveType(BINARY, LogicalTypeAnnotation.stringType(), state);
+    }
   }
 
   private static boolean isUnion(StructOrUnionType s) {
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/ThriftType.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/ThriftType.java
index 8818e45..d6fb0ef 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/ThriftType.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/ThriftType.java
@@ -31,6 +31,8 @@
 import static org.apache.parquet.thrift.struct.ThriftTypeID.STRING;
 import static org.apache.parquet.thrift.struct.ThriftTypeID.STRUCT;
 
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -64,6 +66,19 @@
     @JsonSubTypes.Type(value=ThriftType.StructType.class, name="STRUCT")
 })
 public abstract class ThriftType {
+  private LogicalTypeAnnotation logicalTypeAnnotation;
+
+  public boolean hasLogicalTypeAnnotation() {
+    return this.logicalTypeAnnotation != null;
+  }
+
+  public LogicalTypeAnnotation getLogicalTypeAnnotation() {
+    return this.logicalTypeAnnotation;
+  }
+
+  public void setLogicalTypeAnnotation(LogicalTypeAnnotation logicalTypeAnnotation) {
+    this.logicalTypeAnnotation = logicalTypeAnnotation;
+  }
 
   @Override
   public boolean equals(Object o) {
@@ -624,6 +639,7 @@
     public I64Type() {
       super(I64);
     }
+
     @Override
     public <R, S> R accept(StateVisitor<R, S> visitor, S state) {
       return visitor.visit(this, state);
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftMetaData.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftMetaData.java
index e7f42ce..64502a5 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftMetaData.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftMetaData.java
@@ -45,7 +45,8 @@
     assertEquals("ThriftMetaData(thriftClassName: non existent class!!!, descriptor: {\n" +
         "  \"id\" : \"STRUCT\",\n" +
         "  \"children\" : [ ],\n" +
-        "  \"structOrUnionType\" : \"STRUCT\"\n" +
+        "  \"structOrUnionType\" : \"STRUCT\",\n" +
+        "  \"logicalTypeAnnotation\" : null\n" +
         "})", tmd.toString());
 
     tmd = new ThriftMetaData("non existent class!!!", null);
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java
index 06c8cef..7c6d964 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java
@@ -69,7 +69,8 @@
           "    \"values\" : [ {\n" +
           "      \"id\" : 77,\n" +
           "      \"name\" : \"hello\"\n" +
-          "    } ]\n" +
+          "    } ],\n" +
+          "    \"logicalTypeAnnotation\" : null\n" +
           "  }\n" +
           "}", e.getMessage());
     }
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftSchemaConvertVisitor.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftSchemaConvertVisitor.java
new file mode 100644
index 0000000..d34ef42
--- /dev/null
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftSchemaConvertVisitor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.thrift;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
+import org.apache.parquet.schema.Type;
+
+import org.apache.parquet.thrift.projection.FieldProjectionFilter;
+import org.apache.parquet.thrift.struct.ThriftField;
+import org.apache.parquet.thrift.struct.ThriftType;
+import org.apache.parquet.thrift.struct.ThriftType.StructType;
+import org.apache.parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
+
+import org.junit.Test;
+
+import static org.apache.parquet.schema.Type.Repetition;
+import static org.apache.parquet.thrift.struct.ThriftField.Requirement;
+import static org.junit.Assert.assertEquals;
+
+public class TestThriftSchemaConvertVisitor {
+
+  private MessageType buildOneFieldParquetMessage(Type expectedParquetField) {
+    return Types.buildMessage()
+      .addFields(expectedParquetField)
+      .named("ParquetSchema");
+  }
+
+  private StructType buildOneFieldThriftStructType(String fieldName, Short fieldId, ThriftType thriftType) {
+    ThriftField inputThriftField = new ThriftField(fieldName, fieldId, Requirement.REQUIRED, thriftType);
+    List<ThriftField> fields = new ArrayList<ThriftField>(1);
+    fields.add(inputThriftField);
+    return new StructType(fields, StructOrUnionType.STRUCT);
+  }
+
+  @Test
+  public void testConvertBasicI32Type() {
+    String fieldName = "i32Type";
+    Short fieldId = 0;
+
+    ThriftType i32Type = new ThriftType.I32Type();
+
+    StructType thriftStruct = buildOneFieldThriftStructType(fieldName, fieldId, i32Type);
+    MessageType actual = ThriftSchemaConvertVisitor.convert(thriftStruct, FieldProjectionFilter.ALL_COLUMNS);
+
+    Type expectedParquetField = Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED)
+      .named(fieldName)
+      .withId(fieldId);
+    MessageType expected = buildOneFieldParquetMessage(expectedParquetField);
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testConvertLogicalI32Type() {
+    LogicalTypeAnnotation timeLogicalType = LogicalTypeAnnotation.timeType(true, TimeUnit.MILLIS);
+    String fieldName = "timeI32Type";
+    Short fieldId = 0;
+
+    ThriftType timeI32Type = new ThriftType.I32Type();
+    timeI32Type.setLogicalTypeAnnotation(timeLogicalType);
+
+    StructType thriftStruct = buildOneFieldThriftStructType(fieldName, fieldId, timeI32Type);
+    MessageType actual = ThriftSchemaConvertVisitor.convert(thriftStruct, FieldProjectionFilter.ALL_COLUMNS);
+
+    Type expectedParquetField = Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED)
+      .as(timeLogicalType)
+      .named(fieldName)
+      .withId(fieldId);
+    MessageType expected = buildOneFieldParquetMessage(expectedParquetField);
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testConvertBasicI64Type() {
+    String fieldName = "i64Type";
+    Short fieldId = 0;
+
+    ThriftType i64Type = new ThriftType.I64Type();
+    
+    StructType thriftStruct = buildOneFieldThriftStructType(fieldName, fieldId, i64Type);
+    MessageType actual = ThriftSchemaConvertVisitor.convert(thriftStruct, FieldProjectionFilter.ALL_COLUMNS);
+
+    Type expectedParquetField = Types.primitive(PrimitiveTypeName.INT64, Repetition.REQUIRED)
+      .named(fieldName)
+      .withId(fieldId);
+    MessageType expected = buildOneFieldParquetMessage(expectedParquetField);
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testConvertLogicalI64Type() {
+    LogicalTypeAnnotation timestampLogicalType = LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS);
+    String fieldName = "logicalI64Type";
+    Short fieldId = 0;
+
+    ThriftType timestampI64Type = new ThriftType.I64Type();
+    timestampI64Type.setLogicalTypeAnnotation(timestampLogicalType);
+    
+    StructType thriftStruct = buildOneFieldThriftStructType(fieldName, fieldId, timestampI64Type);
+    MessageType actual = ThriftSchemaConvertVisitor.convert(thriftStruct, FieldProjectionFilter.ALL_COLUMNS);
+
+    Type expectedParquetField = Types.primitive(PrimitiveTypeName.INT64, Repetition.REQUIRED)
+      .as(timestampLogicalType)
+      .named(fieldName)
+      .withId(fieldId);
+    MessageType expected = buildOneFieldParquetMessage(expectedParquetField);
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testConvertStringType() {
+    LogicalTypeAnnotation stringLogicalType = LogicalTypeAnnotation.stringType();
+    String fieldName = "stringType";
+    Short fieldId = 0;
+
+    ThriftType stringType = new ThriftType.StringType();
+    
+    StructType thriftStruct = buildOneFieldThriftStructType(fieldName, fieldId, stringType);
+    MessageType actual = ThriftSchemaConvertVisitor.convert(thriftStruct, FieldProjectionFilter.ALL_COLUMNS);
+
+    Type expectedParquetField = Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
+      .as(stringLogicalType)
+      .named(fieldName)
+      .withId(fieldId);
+    MessageType expected = buildOneFieldParquetMessage(expectedParquetField);
+
+    assertEquals(expected, actual);
+
+  }
+
+  @Test
+  public void testConvertLogicalBinaryType() {
+    LogicalTypeAnnotation jsonLogicalType = LogicalTypeAnnotation.jsonType();
+    String fieldName = "logicalBinaryType";
+    Short fieldId = 0;
+
+    ThriftType.StringType jsonBinaryType = new ThriftType.StringType();
+    jsonBinaryType.setBinary(true);
+    jsonBinaryType.setLogicalTypeAnnotation(jsonLogicalType);
+    
+    StructType thriftStruct = buildOneFieldThriftStructType(fieldName, fieldId, jsonBinaryType);
+    MessageType actual = ThriftSchemaConvertVisitor.convert(thriftStruct, FieldProjectionFilter.ALL_COLUMNS);
+
+    Type expectedParquetField = Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
+      .as(jsonLogicalType)
+      .named(fieldName)
+      .withId(fieldId);
+    MessageType expected = buildOneFieldParquetMessage(expectedParquetField);
+
+    assertEquals(expected, actual);
+  }
+}
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/struct/TestThriftType.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/struct/TestThriftType.java
index 88e8bf1..c24cbc6 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/struct/TestThriftType.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/struct/TestThriftType.java
@@ -35,21 +35,24 @@
     assertEquals("{\n"
                 +"  \"id\" : \"STRUCT\",\n"
                 +"  \"children\" : [ ],\n"
-                +"  \"structOrUnionType\" : \"STRUCT\"\n"
+                +"  \"structOrUnionType\" : \"STRUCT\",\n"
+                +"  \"logicalTypeAnnotation\" : null\n"
                 +"}", st.toJSON());
 
     st = new StructType(new LinkedList<ThriftField>(), StructOrUnionType.UNION);
     assertEquals("{\n"
         +"  \"id\" : \"STRUCT\",\n"
         +"  \"children\" : [ ],\n"
-        +"  \"structOrUnionType\" : \"UNION\"\n"
+        +"  \"structOrUnionType\" : \"UNION\",\n"
+        +"  \"logicalTypeAnnotation\" : null\n"
         +"}", st.toJSON());
 
     st = new StructType(new LinkedList<ThriftField>(), StructOrUnionType.STRUCT);
     assertEquals("{\n"
         +"  \"id\" : \"STRUCT\",\n"
         +"  \"children\" : [ ],\n"
-        +"  \"structOrUnionType\" : \"STRUCT\"\n"
+        +"  \"structOrUnionType\" : \"STRUCT\",\n"
+        +"  \"logicalTypeAnnotation\" : null\n"
         +"}", st.toJSON());
   }