PARQUET-915: Support additional Arrow date/time types and metadata

I am working on ARROW-865 which exposes these to Python users

Closes #270

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #311 from wesm/PARQUET-915 and squashes the following commits:

0a89639 [Wes McKinney] Add test for time64[ns]
6331d8c [Wes McKinney] Cast time32[second] to time32[millisecond]
37c1b42 [Wes McKinney] cpplint
5167a7a [Wes McKinney] Add unit test for date64->date32 cast
440b40f [Wes McKinney] Add unit test for date/time types that write without implicit casts
e626ebd [Wes McKinney] Use inline visitor in LevelBuilder
2ab7f12 [Wes McKinney] Plumbing and expansions for rest of Arrow date/time types
3aa64fa [Wes McKinney] Add conversion for TIMESTAMP_MICROS
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 0bdc14d..7b63514 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -43,7 +43,9 @@
 using arrow::PrimitiveArray;
 using arrow::Status;
 using arrow::Table;
+using arrow::TimeUnit;
 
+using ArrowId = ::arrow::Type;
 using ParquetType = parquet::Type;
 using parquet::schema::GroupNode;
 using parquet::schema::NodePtr;
@@ -58,13 +60,98 @@
 
 constexpr uint32_t kDefaultSeed = 0;
 
+LogicalType::type get_logical_type(const ::arrow::DataType& type) {
+  switch (type.id()) {
+    case ArrowId::UINT8:
+      return LogicalType::UINT_8;
+    case ArrowId::INT8:
+      return LogicalType::INT_8;
+    case ArrowId::UINT16:
+      return LogicalType::UINT_16;
+    case ArrowId::INT16:
+      return LogicalType::INT_16;
+    case ArrowId::UINT32:
+      return LogicalType::UINT_32;
+    case ArrowId::INT32:
+      return LogicalType::INT_32;
+    case ArrowId::UINT64:
+      return LogicalType::UINT_64;
+    case ArrowId::INT64:
+      return LogicalType::INT_64;
+    case ArrowId::STRING:
+      return LogicalType::UTF8;
+    case ArrowId::DATE32:
+      return LogicalType::DATE;
+    case ArrowId::DATE64:
+      return LogicalType::DATE;
+    case ArrowId::TIMESTAMP: {
+      const auto& ts_type = static_cast<const ::arrow::TimestampType&>(type);
+      switch (ts_type.unit()) {
+        case TimeUnit::MILLI:
+          return LogicalType::TIMESTAMP_MILLIS;
+        case TimeUnit::MICRO:
+          return LogicalType::TIMESTAMP_MICROS;
+        default:
+          DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps "
+                           "with Parquet.";
+      }
+    }
+    case ArrowId::TIME32:
+      return LogicalType::TIME_MILLIS;
+    case ArrowId::TIME64:
+      return LogicalType::TIME_MICROS;
+    default:
+      break;
+  }
+  return LogicalType::NONE;
+}
+
+ParquetType::type get_physical_type(const ::arrow::DataType& type) {
+  switch (type.id()) {
+    case ArrowId::BOOL:
+      return ParquetType::BOOLEAN;
+    case ArrowId::UINT8:
+    case ArrowId::INT8:
+    case ArrowId::UINT16:
+    case ArrowId::INT16:
+    case ArrowId::UINT32:
+    case ArrowId::INT32:
+      return ParquetType::INT32;
+    case ArrowId::UINT64:
+    case ArrowId::INT64:
+      return ParquetType::INT64;
+    case ArrowId::FLOAT:
+      return ParquetType::FLOAT;
+    case ArrowId::DOUBLE:
+      return ParquetType::DOUBLE;
+    case ArrowId::BINARY:
+      return ParquetType::BYTE_ARRAY;
+    case ArrowId::STRING:
+      return ParquetType::BYTE_ARRAY;
+    case ArrowId::DATE32:
+      return ParquetType::INT32;
+    case ArrowId::DATE64:
+      // Convert to date32 internally
+      return ParquetType::INT32;
+    case ArrowId::TIME32:
+      return ParquetType::INT32;
+    case ArrowId::TIME64:
+      return ParquetType::INT64;
+    case ArrowId::TIMESTAMP:
+      return ParquetType::INT64;
+    default:
+      break;
+  }
+  DCHECK(false) << "cannot reach this code";
+  return ParquetType::INT32;
+}
+
 template <typename TestType>
 struct test_traits {};
 
 template <>
 struct test_traits<::arrow::BooleanType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
   static uint8_t const value;
 };
 
@@ -73,7 +160,6 @@
 template <>
 struct test_traits<::arrow::UInt8Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
   static uint8_t const value;
 };
 
@@ -82,7 +168,6 @@
 template <>
 struct test_traits<::arrow::Int8Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
   static int8_t const value;
 };
 
@@ -91,7 +176,6 @@
 template <>
 struct test_traits<::arrow::UInt16Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
   static uint16_t const value;
 };
 
@@ -100,7 +184,6 @@
 template <>
 struct test_traits<::arrow::Int16Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
   static int16_t const value;
 };
 
@@ -109,7 +192,6 @@
 template <>
 struct test_traits<::arrow::UInt32Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
   static uint32_t const value;
 };
 
@@ -118,7 +200,6 @@
 template <>
 struct test_traits<::arrow::Int32Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
   static int32_t const value;
 };
 
@@ -127,7 +208,6 @@
 template <>
 struct test_traits<::arrow::UInt64Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
   static uint64_t const value;
 };
 
@@ -136,7 +216,6 @@
 template <>
 struct test_traits<::arrow::Int64Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
   static int64_t const value;
 };
 
@@ -145,25 +224,22 @@
 template <>
 struct test_traits<::arrow::TimestampType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
-  static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS;
   static int64_t const value;
 };
 
 const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
 
 template <>
-struct test_traits<::arrow::Date64Type> {
+struct test_traits<::arrow::Date32Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::DATE;
-  static int64_t const value;
+  static int32_t const value;
 };
 
-const int64_t test_traits<::arrow::Date64Type>::value(14688000000000);
+const int32_t test_traits<::arrow::Date32Type>::value(170000);
 
 template <>
 struct test_traits<::arrow::FloatType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
   static float const value;
 };
 
@@ -172,7 +248,6 @@
 template <>
 struct test_traits<::arrow::DoubleType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
   static double const value;
 };
 
@@ -181,14 +256,12 @@
 template <>
 struct test_traits<::arrow::StringType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
-  static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
   static std::string const value;
 };
 
 template <>
 struct test_traits<::arrow::BinaryType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
   static std::string const value;
 };
 
@@ -231,19 +304,20 @@
   }
 }
 
+static std::shared_ptr<GroupNode> MakeSimpleSchema(
+    const ::arrow::DataType& type, Repetition::type repetition) {
+  auto pnode = PrimitiveNode::Make(
+      "column1", repetition, get_physical_type(type), get_logical_type(type));
+  NodePtr node_ =
+      GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+  return std::static_pointer_cast<GroupNode>(node_);
+}
+
 template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
   virtual void SetUp() {}
 
-  std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
-    auto pnode = PrimitiveNode::Make("column1", repetition,
-        test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
-    NodePtr node_ =
-        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
-    return std::static_pointer_cast<GroupNode>(node_);
-  }
-
   std::unique_ptr<ParquetFileWriter> MakeWriter(
       const std::shared_ptr<GroupNode>& schema) {
     sink_ = std::make_shared<InMemoryOutputStream>();
@@ -348,8 +422,8 @@
 
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
-    ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::Date64Type, ::arrow::FloatType,
-    ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType>
+    ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType,
+    ::arrow::StringType, ::arrow::BinaryType>
     TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);
@@ -358,7 +432,8 @@
   std::shared_ptr<Array> values;
   ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
 
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  std::shared_ptr<GroupNode> schema =
+      MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
   this->WriteColumn(schema, values);
 
   this->ReadAndCheckSingleColumnFile(values.get());
@@ -390,7 +465,8 @@
 
   ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
 
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  std::shared_ptr<GroupNode> schema =
+      MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
   this->WriteColumn(schema, values);
 
   this->ReadAndCheckSingleColumnFile(values.get());
@@ -399,7 +475,8 @@
 TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
   std::shared_ptr<Array> values;
   ASSERT_OK(NonNullArray<TypeParam>(2 * SMALL_SIZE, &values));
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  std::shared_ptr<GroupNode> schema =
+      MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
 
   std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
@@ -414,7 +491,8 @@
 TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
   std::shared_ptr<Array> values;
   ASSERT_OK(NullableArray<TypeParam>(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values));
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  std::shared_ptr<GroupNode> schema =
+      MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
 
   std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
@@ -470,7 +548,8 @@
   ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
   int64_t chunk_size = values->length() / 4;
 
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  std::shared_ptr<GroupNode> schema =
+      MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
   FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
     ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
@@ -531,7 +610,8 @@
 
   ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
 
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  std::shared_ptr<GroupNode> schema =
+      MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
   FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
     ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
@@ -601,7 +681,7 @@
   writer->Close();
 
   ::arrow::TimestampBuilder builder(
-      default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO));
+      default_memory_pool(), ::arrow::timestamp(TimeUnit::NANO));
   builder.Append(val);
   std::shared_ptr<Array> values;
   ASSERT_OK(builder.Finish(&values));
@@ -715,7 +795,9 @@
 
   void MakeTestFile(
       std::vector<T>& values, int num_chunks, std::unique_ptr<FileReader>* reader) {
-    std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+    TestType dummy;
+
+    std::shared_ptr<GroupNode> schema = MakeSimpleSchema(dummy, Repetition::REQUIRED);
     std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
     size_t chunk_size = values.size() / num_chunks;
     // Convert to Parquet's expected physical type
@@ -787,6 +869,89 @@
   this->CheckSingleColumnRequiredTableRead(4);
 }
 
+void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
+  using ::arrow::ArrayFromVector;
+
+  std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+  // These are only types that roundtrip without modification
+  auto f0 = field("f0", ::arrow::date32());
+  auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
+  auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
+  auto f3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
+  auto f4 = field("f4", ::arrow::time64(TimeUnit::MICRO));
+  std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4}));
+
+  std::vector<int32_t> t32_values = {
+      1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000};
+  std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
+      1489272000000, 1489272000000, 1489273000000};
+
+  std::shared_ptr<Array> a0, a1, a2, a3, a4;
+  ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2);
+  ArrayFromVector<::arrow::Time32Type, int32_t>(f3->type(), is_valid, t32_values, &a3);
+  ArrayFromVector<::arrow::Time64Type, int64_t>(f4->type(), is_valid, t64_values, &a4);
+
+  std::vector<std::shared_ptr<::arrow::Column>> columns = {
+      std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
+      std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
+      std::make_shared<Column>("f4", a4)};
+  *out = std::make_shared<::arrow::Table>(schema, columns);
+}
+
+TEST(TestArrowReadWrite, DateTimeTypes) {
+  std::shared_ptr<Table> table;
+  MakeDateTimeTypesTable(&table);
+
+  std::shared_ptr<Table> result;
+  DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+
+  ASSERT_TRUE(table->Equals(*result));
+}
+
+TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
+  using ::arrow::ArrayFromVector;
+
+  std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+  auto f0 = field("f0", ::arrow::date64());
+  auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND));
+  std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1}));
+
+  std::vector<int64_t> a0_values = {1489190400000, 1489276800000, 1489363200000,
+      1489449600000, 1489536000000, 1489622400000};
+  std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5};
+
+  std::shared_ptr<Array> a0, a1, x0, x1;
+  ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0);
+  ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1);
+
+  std::vector<std::shared_ptr<::arrow::Column>> columns = {
+      std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1)};
+  auto table = std::make_shared<::arrow::Table>(schema, columns);
+
+  // Expected schema and values
+  auto e0 = field("f0", ::arrow::date32());
+  auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI));
+  std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({e0, e1}));
+
+  std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241};
+  std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000};
+  ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0);
+  ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1);
+
+  std::vector<std::shared_ptr<::arrow::Column>> ex_columns = {
+      std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1)};
+  auto ex_table = std::make_shared<::arrow::Table>(ex_schema, ex_columns);
+
+  std::shared_ptr<Table> result;
+  DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+
+  ASSERT_TRUE(result->Equals(*ex_table));
+}
+
 void MakeDoubleTable(
     int num_columns, int num_rows, int nchunks, std::shared_ptr<Table>* out) {
   std::shared_ptr<::arrow::Column> column;
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 0f6b455..2042566 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -26,6 +26,7 @@
 #include "arrow/test-util.h"
 
 using arrow::Field;
+using arrow::TimeUnit;
 
 using ParquetType = parquet::Type;
 using parquet::LogicalType;
@@ -45,11 +46,9 @@
 const auto FLOAT = ::arrow::float32();
 const auto DOUBLE = ::arrow::float64();
 const auto UTF8 = ::arrow::utf8();
-const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
-const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
-
-// TODO: This requires parquet-cpp implementing the MICROS enum value
-// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
+const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI);
+const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
+const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
 const auto BINARY = ::arrow::binary();
 const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
 
@@ -62,8 +61,8 @@
     for (int i = 0; i < expected_schema->num_fields(); ++i) {
       auto lhs = result_schema_->field(i);
       auto rhs = expected_schema->field(i);
-      EXPECT_TRUE(lhs->Equals(rhs))
-          << i << " " << lhs->ToString() << " != " << rhs->ToString();
+      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+                                    << " != " << rhs->ToString();
     }
   }
 
@@ -105,9 +104,23 @@
       ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
   arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
 
+  parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
+      ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+  arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
+
   parquet_fields.push_back(PrimitiveNode::Make(
       "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
-  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date64(), false));
+  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "time32", Repetition::REQUIRED, ParquetType::INT32, LogicalType::TIME_MILLIS));
+  arrow_fields.push_back(std::make_shared<Field>(
+      "time32", ::arrow::time32(TimeUnit::MILLI), false));
+
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "time64", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIME_MICROS));
+  arrow_fields.push_back(std::make_shared<Field>(
+      "time64", ::arrow::time64(TimeUnit::MICRO), false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
@@ -568,7 +581,11 @@
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
-  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date64(), false));
+  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "date64", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
+  arrow_fields.push_back(std::make_shared<Field>("date64", ::arrow::date64(), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
       ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
@@ -644,6 +661,16 @@
   CheckFlatSchema(parquet_fields);
 }
 
+TEST_F(TestConvertArrowSchema, UnsupportedTypes) {
+  std::vector<std::shared_ptr<Field>> unsupported_fields = {
+    ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO))
+  };
+
+  for (const auto& field : unsupported_fields) {
+    ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
+  }
+}
+
 TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;
@@ -655,8 +682,5 @@
   CheckFlatSchema(parquet_fields);
 }
 
-TEST(TestNodeConversion, DateAndTime) {}
-
 }  // namespace arrow
-
 }  // namespace parquet
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 38d5583..852649a 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -502,6 +502,10 @@
 NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
 NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
 NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
 
 template <>
 Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
@@ -607,6 +611,10 @@
 NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
 NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
 NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
+NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
 
 template <>
 Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
@@ -1036,13 +1044,14 @@
     TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
     TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
     TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
-    TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
     TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
     TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
     TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
     TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
     TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
     TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
+    TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
+    TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
     case ::arrow::Type::TIMESTAMP: {
       ::arrow::TimestampType* timestamp_type =
           static_cast<::arrow::TimestampType*>(field_->type().get());
@@ -1050,6 +1059,9 @@
         case ::arrow::TimeUnit::MILLI:
           return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
           break;
+        case ::arrow::TimeUnit::MICRO:
+          return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
+          break;
         case ::arrow::TimeUnit::NANO:
           return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
           break;
@@ -1058,6 +1070,8 @@
       }
       break;
     }
+      TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type)
+      TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type)
     default:
       std::stringstream ss;
       ss << "No support for reading columns of type " << field_->type()->ToString();
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 25713a7..31895ce 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -45,6 +45,7 @@
 namespace arrow {
 
 const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
+const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
 
 TypePtr MakeDecimalType(const PrimitiveNode* node) {
@@ -105,18 +106,18 @@
     case LogicalType::INT_16:
       *out = ::arrow::int16();
       break;
+    case LogicalType::INT_32:
+      *out = ::arrow::int32();
+      break;
     case LogicalType::UINT_32:
       *out = ::arrow::uint32();
       break;
     case LogicalType::DATE:
-      *out = ::arrow::date64();
+      *out = ::arrow::date32();
       break;
     case LogicalType::TIME_MILLIS:
       *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
       break;
-    case LogicalType::TIME_MICROS:
-      *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
-      break;
     case LogicalType::DECIMAL:
       *out = MakeDecimalType(node);
       break;
@@ -135,6 +136,9 @@
     case LogicalType::NONE:
       *out = ::arrow::int64();
       break;
+    case LogicalType::INT_64:
+      *out = ::arrow::int64();
+      break;
     case LogicalType::UINT_64:
       *out = ::arrow::uint64();
       break;
@@ -144,6 +148,12 @@
     case LogicalType::TIMESTAMP_MILLIS:
       *out = TIMESTAMP_MS;
       break;
+    case LogicalType::TIMESTAMP_MICROS:
+      *out = TIMESTAMP_US;
+      break;
+    case LogicalType::TIME_MICROS:
+      *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
+      break;
     default:
       std::stringstream ss;
       ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
@@ -455,21 +465,30 @@
       break;
     case ArrowType::TIMESTAMP: {
       auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get());
-      if (timestamp_type->unit() != ::arrow::TimestampType::Unit::MILLI) {
-        return Status::NotImplemented(
-            "Other timestamp units than millisecond are not yet support with parquet.");
-      }
+      auto unit = timestamp_type->unit();
       type = ParquetType::INT64;
-      logical_type = LogicalType::TIMESTAMP_MILLIS;
+      if (unit == ::arrow::TimeUnit::MILLI) {
+        logical_type = LogicalType::TIMESTAMP_MILLIS;
+      } else if (unit == ::arrow::TimeUnit::MICRO) {
+        logical_type = LogicalType::TIMESTAMP_MICROS;
+      } else {
+        return Status::NotImplemented(
+            "Only MILLI and MICRO units supported for Arrow timestamps with Parquet.");
+      }
     } break;
     case ArrowType::TIME32:
-      type = ParquetType::INT64;
+      type = ParquetType::INT32;
       logical_type = LogicalType::TIME_MILLIS;
       break;
-    case ArrowType::TIME64:
+    case ArrowType::TIME64: {
+      auto time_type = static_cast<::arrow::Time64Type*>(field->type().get());
+      if (time_type->unit() == ::arrow::TimeUnit::NANO) {
+        return Status::NotImplemented(
+            "Nanosecond time not supported in Parquet.");
+      }
       type = ParquetType::INT64;
       logical_type = LogicalType::TIME_MICROS;
-      break;
+    } break;
     case ArrowType::STRUCT: {
       auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
       return StructToNode(struct_type, field->name(), field->nullable(), properties, out);
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index bff952b..8bcd314 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -20,6 +20,7 @@
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
+#include "arrow/type_traits.h"
 
 namespace parquet {
 namespace arrow {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 90cd135..6ac33b1 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -26,6 +26,7 @@
 #include "parquet/arrow/schema.h"
 
 #include "arrow/api.h"
+#include "arrow/visitor_inline.h"
 
 using arrow::Array;
 using arrow::BinaryArray;
@@ -39,6 +40,7 @@
 using arrow::ListArray;
 using arrow::Status;
 using arrow::Table;
+using arrow::TimeUnit;
 
 using parquet::ParquetFileWriter;
 using parquet::ParquetVersion;
@@ -49,44 +51,34 @@
 
 namespace BitUtil = ::arrow::BitUtil;
 
-class LevelBuilder : public ::arrow::ArrayVisitor {
+class LevelBuilder {
  public:
   explicit LevelBuilder(MemoryPool* pool)
       : def_levels_(pool, ::arrow::int16()), rep_levels_(pool, ::arrow::int16()) {
     def_levels_buffer_ = std::make_shared<PoolBuffer>(pool);
   }
 
-#define PRIMITIVE_VISIT(ArrowTypePrefix)                                \
-  Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \
-    array_offsets_.push_back(array.offset());                           \
-    valid_bitmaps_.push_back(array.null_bitmap_data());                 \
-    null_counts_.push_back(array.null_count());                         \
-    values_type_ = array.type_id();                                     \
-    values_array_ = &array;                                             \
-    return Status::OK();                                                \
+  Status VisitInline(const Array& array);
+
+  Status Visit(const ::arrow::PrimitiveArray& array) {
+    array_offsets_.push_back(array.offset());
+    valid_bitmaps_.push_back(array.null_bitmap_data());
+    null_counts_.push_back(array.null_count());
+    values_type_ = array.type_id();
+    values_array_ = &array;
+    return Status::OK();
   }
 
-  PRIMITIVE_VISIT(Boolean)
-  PRIMITIVE_VISIT(Int8)
-  PRIMITIVE_VISIT(Int16)
-  PRIMITIVE_VISIT(Int32)
-  PRIMITIVE_VISIT(Int64)
-  PRIMITIVE_VISIT(UInt8)
-  PRIMITIVE_VISIT(UInt16)
-  PRIMITIVE_VISIT(UInt32)
-  PRIMITIVE_VISIT(UInt64)
-  PRIMITIVE_VISIT(HalfFloat)
-  PRIMITIVE_VISIT(Float)
-  PRIMITIVE_VISIT(Double)
-  PRIMITIVE_VISIT(String)
-  PRIMITIVE_VISIT(Binary)
-  PRIMITIVE_VISIT(Date64)
-  PRIMITIVE_VISIT(Time32)
-  PRIMITIVE_VISIT(Time64)
-  PRIMITIVE_VISIT(Timestamp)
-  PRIMITIVE_VISIT(Interval)
+  Status Visit(const ::arrow::BinaryArray& array) {
+    array_offsets_.push_back(array.offset());
+    valid_bitmaps_.push_back(array.null_bitmap_data());
+    null_counts_.push_back(array.null_count());
+    values_type_ = array.type_id();
+    values_array_ = &array;
+    return Status::OK();
+  }
 
-  Status Visit(const ListArray& array) override {
+  Status Visit(const ListArray& array) {
     array_offsets_.push_back(array.offset());
     valid_bitmaps_.push_back(array.null_bitmap_data());
     null_counts_.push_back(array.null_count());
@@ -95,20 +87,21 @@
     min_offset_idx_ = array.value_offset(min_offset_idx_);
     max_offset_idx_ = array.value_offset(max_offset_idx_);
 
-    return array.values()->Accept(this);
+    return VisitInline(*array.values());
   }
 
-#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix)                           \
-  Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \
-    return Status::NotImplemented(                                      \
-        "Level generation for ArrowTypePrefix not supported yet");      \
-  };
+#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix)                     \
+  Status Visit(const ::arrow::ArrowTypePrefix##Array& array) {     \
+    return Status::NotImplemented(                                 \
+        "Level generation for ArrowTypePrefix not supported yet"); \
+  }
 
-  NOT_IMPLEMENTED_VIST(Null)
-  NOT_IMPLEMENTED_VIST(Struct)
-  NOT_IMPLEMENTED_VIST(Union)
-  NOT_IMPLEMENTED_VIST(Decimal)
-  NOT_IMPLEMENTED_VIST(Dictionary)
+  NOT_IMPLEMENTED_VISIT(Null)
+  NOT_IMPLEMENTED_VISIT(Struct)
+  NOT_IMPLEMENTED_VISIT(Union)
+  NOT_IMPLEMENTED_VISIT(Decimal)
+  NOT_IMPLEMENTED_VISIT(Dictionary)
+  NOT_IMPLEMENTED_VISIT(Interval)
 
   Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
       int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values,
@@ -117,7 +110,7 @@
     // Work downwards to extract bitmaps and offsets
     min_offset_idx_ = 0;
     max_offset_idx_ = array.length();
-    RETURN_NOT_OK(array.Accept(this));
+    RETURN_NOT_OK(VisitInline(array));
     *num_values = max_offset_idx_ - min_offset_idx_;
     *values_offset = min_offset_idx_;
     *values_type = values_type_;
@@ -247,6 +240,10 @@
   const Array* values_array_;
 };
 
+Status LevelBuilder::VisitInline(const Array& array) {
+  return VisitArrayInline(array, this);
+}
+
 class FileWriter::Impl {
  public:
   Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
@@ -257,14 +254,15 @@
       int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
 
   template <typename ParquetType, typename ArrowType>
-  Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
-      int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
+  Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
+      const ArrowType& type, int64_t num_values, int64_t num_levels,
+      const int16_t* def_levels, const int16_t* rep_levels,
       const typename ArrowType::c_type* data_ptr);
 
   template <typename ParquetType, typename ArrowType>
-  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
-      int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
-      const uint8_t* valid_bits, int64_t valid_bits_offset,
+  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, const ArrowType& type,
+      int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
       const typename ArrowType::c_type* data_ptr);
 
   Status WriteColumnChunk(const Array& data);
@@ -307,13 +305,14 @@
 
   if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
     // no nulls, just dump the data
-    RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer, array->length(),
-        num_levels, def_levels, rep_levels, data_ptr + data->offset())));
+    RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer,
+        static_cast<const ArrowType&>(*array->type()), array->length(), num_levels,
+        def_levels, rep_levels, data_ptr + data->offset())));
   } else {
     const uint8_t* valid_bits = data->null_bitmap_data();
-    RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, data->length(),
-        num_levels, def_levels, rep_levels, valid_bits, data->offset(),
-        data_ptr + data->offset())));
+    RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer,
+        static_cast<const ArrowType&>(*array->type()), data->length(), num_levels,
+        def_levels, rep_levels, valid_bits, data->offset(), data_ptr + data->offset())));
   }
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
@@ -321,8 +320,9 @@
 
 template <typename ParquetType, typename ArrowType>
 Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) {
+    const ArrowType& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels,
+    const typename ArrowType::c_type* data_ptr) {
   using ParquetCType = typename ParquetType::c_type;
   RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
   auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
@@ -334,8 +334,9 @@
 
 template <>
 Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
-    TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) {
+    TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const int64_t* data_ptr) {
   RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
   auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
   for (int i = 0; i < num_values; i++) {
@@ -346,27 +347,49 @@
   return Status::OK();
 }
 
-#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                     \
-  template <>                                                                          \
-  Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>(              \
-      TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \
-      const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) {   \
-    PARQUET_CATCH_NOT_OK(                                                              \
-        writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr));             \
-    return Status::OK();                                                               \
+template <>
+Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
+    TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const int32_t* data_ptr) {
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+  if (type.unit() == TimeUnit::SECOND) {
+    for (int i = 0; i < num_values; i++) {
+      buffer_ptr[i] = data_ptr[i] * 1000;
+    }
+  } else {
+    std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
+  }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  return Status::OK();
+}
+
+#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)         \
+  template <>                                                              \
+  Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>(  \
+      TypedColumnWriter<ParquetType> * writer, const ArrowType& type,      \
+      int64_t num_values, int64_t num_levels, const int16_t* def_levels,   \
+      const int16_t* rep_levels, const CType* data_ptr) {                  \
+    PARQUET_CATCH_NOT_OK(                                                  \
+        writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
+    return Status::OK();                                                   \
   }
 
 NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
 NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
 NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
 NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
 NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 
 template <typename ParquetType, typename ArrowType>
 Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
-    const typename ArrowType::c_type* data_ptr) {
+    const ArrowType& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr) {
   using ParquetCType = typename ParquetType::c_type;
 
   RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
@@ -386,9 +409,10 @@
 
 template <>
 Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
-    TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const int64_t* data_ptr) {
+    TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+    const int64_t* data_ptr) {
   RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
   auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
   INIT_BITSET(valid_bits, valid_bits_offset);
@@ -405,20 +429,54 @@
   return Status::OK();
 }
 
+template <>
+Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
+    TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+    const int32_t* data_ptr) {
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+  INIT_BITSET(valid_bits, valid_bits_offset);
+
+  if (type.unit() == TimeUnit::SECOND) {
+    for (int i = 0; i < num_values; i++) {
+      if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+        buffer_ptr[i] = data_ptr[i] * 1000;
+      }
+      READ_NEXT_BITSET(valid_bits);
+    }
+  } else {
+    for (int i = 0; i < num_values; i++) {
+      if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+        buffer_ptr[i] = data_ptr[i];
+      }
+      READ_NEXT_BITSET(valid_bits);
+    }
+  }
+  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
+      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+
+  return Status::OK();
+}
+
 #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
   template <>                                                                          \
   Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(                 \
-      TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \
-      const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
-      int64_t valid_bits_offset, const CType* data_ptr) {                              \
+      TypedColumnWriter<ParquetType> * writer, const ArrowType& type,                  \
+      int64_t num_values, int64_t num_levels, const int16_t* def_levels,               \
+      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \
+      const CType* data_ptr) {                                                         \
     PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(                                     \
         num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
     return Status::OK();                                                               \
   }
 
 NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
 NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
 NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
 NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
 NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 
@@ -553,14 +611,17 @@
       WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
       WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
       WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
-      WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
       WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
-      WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
       WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
       WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
       WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
       WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
       WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
+      WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
+      WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
+      WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
+      WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
+      WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
     default:
       std::stringstream ss;
       ss << "Data type not supported as list value: " << values_array->type()->ToString();