PARQUET-915: Support additional Arrow date/time types and metadata
I am working on ARROW-865 which exposes these to Python users
Closes #270
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #311 from wesm/PARQUET-915 and squashes the following commits:
0a89639 [Wes McKinney] Add test for time64[ns]
6331d8c [Wes McKinney] Cast time32[second] to time32[millisecond]
37c1b42 [Wes McKinney] cpplint
5167a7a [Wes McKinney] Add unit test for date64->date32 cast
440b40f [Wes McKinney] Add unit test for date/time types that write without implicit casts
e626ebd [Wes McKinney] Use inline visitor in LevelBuilder
2ab7f12 [Wes McKinney] Plumbing and expansions for rest of Arrow date/time types
3aa64fa [Wes McKinney] Add conversion for TIMESTAMP_MICROS
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 0bdc14d..7b63514 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -43,7 +43,9 @@
using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;
+using arrow::TimeUnit;
+using ArrowId = ::arrow::Type;
using ParquetType = parquet::Type;
using parquet::schema::GroupNode;
using parquet::schema::NodePtr;
@@ -58,13 +60,98 @@
constexpr uint32_t kDefaultSeed = 0;
+LogicalType::type get_logical_type(const ::arrow::DataType& type) {
+ switch (type.id()) {
+ case ArrowId::UINT8:
+ return LogicalType::UINT_8;
+ case ArrowId::INT8:
+ return LogicalType::INT_8;
+ case ArrowId::UINT16:
+ return LogicalType::UINT_16;
+ case ArrowId::INT16:
+ return LogicalType::INT_16;
+ case ArrowId::UINT32:
+ return LogicalType::UINT_32;
+ case ArrowId::INT32:
+ return LogicalType::INT_32;
+ case ArrowId::UINT64:
+ return LogicalType::UINT_64;
+ case ArrowId::INT64:
+ return LogicalType::INT_64;
+ case ArrowId::STRING:
+ return LogicalType::UTF8;
+ case ArrowId::DATE32:
+ return LogicalType::DATE;
+ case ArrowId::DATE64:
+ return LogicalType::DATE;
+ case ArrowId::TIMESTAMP: {
+ const auto& ts_type = static_cast<const ::arrow::TimestampType&>(type);
+ switch (ts_type.unit()) {
+ case TimeUnit::MILLI:
+ return LogicalType::TIMESTAMP_MILLIS;
+ case TimeUnit::MICRO:
+ return LogicalType::TIMESTAMP_MICROS;
+ default:
+ DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps "
+ "with Parquet.";
+ }
+ }
+ case ArrowId::TIME32:
+ return LogicalType::TIME_MILLIS;
+ case ArrowId::TIME64:
+ return LogicalType::TIME_MICROS;
+ default:
+ break;
+ }
+ return LogicalType::NONE;
+}
+
+ParquetType::type get_physical_type(const ::arrow::DataType& type) {
+ switch (type.id()) {
+ case ArrowId::BOOL:
+ return ParquetType::BOOLEAN;
+ case ArrowId::UINT8:
+ case ArrowId::INT8:
+ case ArrowId::UINT16:
+ case ArrowId::INT16:
+ case ArrowId::UINT32:
+ case ArrowId::INT32:
+ return ParquetType::INT32;
+ case ArrowId::UINT64:
+ case ArrowId::INT64:
+ return ParquetType::INT64;
+ case ArrowId::FLOAT:
+ return ParquetType::FLOAT;
+ case ArrowId::DOUBLE:
+ return ParquetType::DOUBLE;
+ case ArrowId::BINARY:
+ return ParquetType::BYTE_ARRAY;
+ case ArrowId::STRING:
+ return ParquetType::BYTE_ARRAY;
+ case ArrowId::DATE32:
+ return ParquetType::INT32;
+ case ArrowId::DATE64:
+ // Convert to date32 internally
+ return ParquetType::INT32;
+ case ArrowId::TIME32:
+ return ParquetType::INT32;
+ case ArrowId::TIME64:
+ return ParquetType::INT64;
+ case ArrowId::TIMESTAMP:
+ return ParquetType::INT64;
+ default:
+ break;
+ }
+ DCHECK(false) << "cannot reach this code";
+ return ParquetType::INT32;
+}
+
template <typename TestType>
struct test_traits {};
template <>
struct test_traits<::arrow::BooleanType> {
static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
- static constexpr LogicalType::type logical_enum = LogicalType::NONE;
static uint8_t const value;
};
@@ -73,7 +160,6 @@
template <>
struct test_traits<::arrow::UInt8Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
- static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
static uint8_t const value;
};
@@ -82,7 +168,6 @@
template <>
struct test_traits<::arrow::Int8Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
- static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
static int8_t const value;
};
@@ -91,7 +176,6 @@
template <>
struct test_traits<::arrow::UInt16Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
- static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
static uint16_t const value;
};
@@ -100,7 +184,6 @@
template <>
struct test_traits<::arrow::Int16Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
- static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
static int16_t const value;
};
@@ -109,7 +192,6 @@
template <>
struct test_traits<::arrow::UInt32Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
- static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
static uint32_t const value;
};
@@ -118,7 +200,6 @@
template <>
struct test_traits<::arrow::Int32Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
- static constexpr LogicalType::type logical_enum = LogicalType::NONE;
static int32_t const value;
};
@@ -127,7 +208,6 @@
template <>
struct test_traits<::arrow::UInt64Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
- static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
static uint64_t const value;
};
@@ -136,7 +216,6 @@
template <>
struct test_traits<::arrow::Int64Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
- static constexpr LogicalType::type logical_enum = LogicalType::NONE;
static int64_t const value;
};
@@ -145,25 +224,22 @@
template <>
struct test_traits<::arrow::TimestampType> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
- static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS;
static int64_t const value;
};
const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
template <>
-struct test_traits<::arrow::Date64Type> {
+struct test_traits<::arrow::Date32Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
- static constexpr LogicalType::type logical_enum = LogicalType::DATE;
- static int64_t const value;
+ static int32_t const value;
};
-const int64_t test_traits<::arrow::Date64Type>::value(14688000000000);
+const int32_t test_traits<::arrow::Date32Type>::value(170000);
template <>
struct test_traits<::arrow::FloatType> {
static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
- static constexpr LogicalType::type logical_enum = LogicalType::NONE;
static float const value;
};
@@ -172,7 +248,6 @@
template <>
struct test_traits<::arrow::DoubleType> {
static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
- static constexpr LogicalType::type logical_enum = LogicalType::NONE;
static double const value;
};
@@ -181,14 +256,12 @@
template <>
struct test_traits<::arrow::StringType> {
static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
- static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
static std::string const value;
};
template <>
struct test_traits<::arrow::BinaryType> {
static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
- static constexpr LogicalType::type logical_enum = LogicalType::NONE;
static std::string const value;
};
@@ -231,19 +304,20 @@
}
}
+static std::shared_ptr<GroupNode> MakeSimpleSchema(
+ const ::arrow::DataType& type, Repetition::type repetition) {
+ auto pnode = PrimitiveNode::Make(
+ "column1", repetition, get_physical_type(type), get_logical_type(type));
+ NodePtr node_ =
+ GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+ return std::static_pointer_cast<GroupNode>(node_);
+}
+
template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
virtual void SetUp() {}
- std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
- auto pnode = PrimitiveNode::Make("column1", repetition,
- test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
- NodePtr node_ =
- GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
- return std::static_pointer_cast<GroupNode>(node_);
- }
-
std::unique_ptr<ParquetFileWriter> MakeWriter(
const std::shared_ptr<GroupNode>& schema) {
sink_ = std::make_shared<InMemoryOutputStream>();
@@ -348,8 +422,8 @@
typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
- ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::Date64Type, ::arrow::FloatType,
- ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType>
+ ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType,
+ ::arrow::StringType, ::arrow::BinaryType>
TestTypes;
TYPED_TEST_CASE(TestParquetIO, TestTypes);
@@ -358,7 +432,8 @@
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
- std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
this->WriteColumn(schema, values);
this->ReadAndCheckSingleColumnFile(values.get());
@@ -390,7 +465,8 @@
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
- std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, values);
this->ReadAndCheckSingleColumnFile(values.get());
@@ -399,7 +475,8 @@
TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(2 * SMALL_SIZE, &values));
- std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
@@ -414,7 +491,8 @@
TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values));
- std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
@@ -470,7 +548,8 @@
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
int64_t chunk_size = values->length() / 4;
- std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
for (int i = 0; i < 4; i++) {
ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
@@ -531,7 +610,8 @@
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
- std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
for (int i = 0; i < 4; i++) {
ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
@@ -601,7 +681,7 @@
writer->Close();
::arrow::TimestampBuilder builder(
- default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO));
+ default_memory_pool(), ::arrow::timestamp(TimeUnit::NANO));
builder.Append(val);
std::shared_ptr<Array> values;
ASSERT_OK(builder.Finish(&values));
@@ -715,7 +795,9 @@
void MakeTestFile(
std::vector<T>& values, int num_chunks, std::unique_ptr<FileReader>* reader) {
- std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ TestType dummy;
+
+ std::shared_ptr<GroupNode> schema = MakeSimpleSchema(dummy, Repetition::REQUIRED);
std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
size_t chunk_size = values.size() / num_chunks;
// Convert to Parquet's expected physical type
@@ -787,6 +869,89 @@
this->CheckSingleColumnRequiredTableRead(4);
}
+void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
+ using ::arrow::ArrayFromVector;
+
+ std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+ // These are only types that roundtrip without modification
+ auto f0 = field("f0", ::arrow::date32());
+ auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
+ auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
+ auto f3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
+ auto f4 = field("f4", ::arrow::time64(TimeUnit::MICRO));
+ std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4}));
+
+ std::vector<int32_t> t32_values = {
+ 1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000};
+ std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
+ 1489272000000, 1489272000000, 1489273000000};
+
+ std::shared_ptr<Array> a0, a1, a2, a3, a4;
+ ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
+ ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1);
+ ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2);
+ ArrayFromVector<::arrow::Time32Type, int32_t>(f3->type(), is_valid, t32_values, &a3);
+ ArrayFromVector<::arrow::Time64Type, int64_t>(f4->type(), is_valid, t64_values, &a4);
+
+ std::vector<std::shared_ptr<::arrow::Column>> columns = {
+ std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
+ std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
+ std::make_shared<Column>("f4", a4)};
+ *out = std::make_shared<::arrow::Table>(schema, columns);
+}
+
+TEST(TestArrowReadWrite, DateTimeTypes) {
+ std::shared_ptr<Table> table;
+ MakeDateTimeTypesTable(&table);
+
+ std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+
+ ASSERT_TRUE(table->Equals(*result));
+}
+
+TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
+ using ::arrow::ArrayFromVector;
+
+ std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+ auto f0 = field("f0", ::arrow::date64());
+ auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND));
+ std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1}));
+
+ std::vector<int64_t> a0_values = {1489190400000, 1489276800000, 1489363200000,
+ 1489449600000, 1489536000000, 1489622400000};
+ std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5};
+
+ std::shared_ptr<Array> a0, a1, x0, x1;
+ ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0);
+ ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1);
+
+ std::vector<std::shared_ptr<::arrow::Column>> columns = {
+ std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1)};
+ auto table = std::make_shared<::arrow::Table>(schema, columns);
+
+ // Expected schema and values
+ auto e0 = field("f0", ::arrow::date32());
+ auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI));
+ std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({e0, e1}));
+
+ std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241};
+ std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000};
+ ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0);
+ ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1);
+
+ std::vector<std::shared_ptr<::arrow::Column>> ex_columns = {
+ std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1)};
+ auto ex_table = std::make_shared<::arrow::Table>(ex_schema, ex_columns);
+
+ std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+
+ ASSERT_TRUE(result->Equals(*ex_table));
+}
+
void MakeDoubleTable(
int num_columns, int num_rows, int nchunks, std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Column> column;
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 0f6b455..2042566 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -26,6 +26,7 @@
#include "arrow/test-util.h"
using arrow::Field;
+using arrow::TimeUnit;
using ParquetType = parquet::Type;
using parquet::LogicalType;
@@ -45,11 +46,9 @@
const auto FLOAT = ::arrow::float32();
const auto DOUBLE = ::arrow::float64();
const auto UTF8 = ::arrow::utf8();
-const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
-const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
-
-// TODO: This requires parquet-cpp implementing the MICROS enum value
-// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
+const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI);
+const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
+const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
const auto BINARY = ::arrow::binary();
const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
@@ -62,8 +61,8 @@
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
- EXPECT_TRUE(lhs->Equals(rhs))
- << i << " " << lhs->ToString() << " != " << rhs->ToString();
+ EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+ << " != " << rhs->ToString();
}
}
@@ -105,9 +104,23 @@
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+ parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
+ ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+ arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
+
parquet_fields.push_back(PrimitiveNode::Make(
"date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
- arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date64(), false));
+ arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "time32", Repetition::REQUIRED, ParquetType::INT32, LogicalType::TIME_MILLIS));
+ arrow_fields.push_back(std::make_shared<Field>(
+ "time32", ::arrow::time32(TimeUnit::MILLI), false));
+
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "time64", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIME_MICROS));
+ arrow_fields.push_back(std::make_shared<Field>(
+ "time64", ::arrow::time64(TimeUnit::MICRO), false));
parquet_fields.push_back(
PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
@@ -568,7 +581,11 @@
parquet_fields.push_back(PrimitiveNode::Make(
"date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
- arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date64(), false));
+ arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "date64", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
+ arrow_fields.push_back(std::make_shared<Field>("date64", ::arrow::date64(), false));
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
@@ -644,6 +661,16 @@
CheckFlatSchema(parquet_fields);
}
+TEST_F(TestConvertArrowSchema, UnsupportedTypes) {
+ std::vector<std::shared_ptr<Field>> unsupported_fields = {
+ ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO))
+ };
+
+ for (const auto& field : unsupported_fields) {
+ ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
+ }
+}
+
TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
@@ -655,8 +682,5 @@
CheckFlatSchema(parquet_fields);
}
-TEST(TestNodeConversion, DateAndTime) {}
-
} // namespace arrow
-
} // namespace parquet
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 38d5583..852649a 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -502,6 +502,10 @@
NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
template <>
Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
@@ -607,6 +611,10 @@
NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
+NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
template <>
Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
@@ -1036,13 +1044,14 @@
TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
- TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
+ TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
+ TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type().get());
@@ -1050,6 +1059,9 @@
case ::arrow::TimeUnit::MILLI:
return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
break;
+ case ::arrow::TimeUnit::MICRO:
+ return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
+ break;
case ::arrow::TimeUnit::NANO:
return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
break;
@@ -1058,6 +1070,8 @@
}
break;
}
+ TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type)
+ TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type)
default:
std::stringstream ss;
ss << "No support for reading columns of type " << field_->type()->ToString();
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 25713a7..31895ce 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -45,6 +45,7 @@
namespace arrow {
const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
+const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
TypePtr MakeDecimalType(const PrimitiveNode* node) {
@@ -105,18 +106,18 @@
case LogicalType::INT_16:
*out = ::arrow::int16();
break;
+ case LogicalType::INT_32:
+ *out = ::arrow::int32();
+ break;
case LogicalType::UINT_32:
*out = ::arrow::uint32();
break;
case LogicalType::DATE:
- *out = ::arrow::date64();
+ *out = ::arrow::date32();
break;
case LogicalType::TIME_MILLIS:
*out = ::arrow::time32(::arrow::TimeUnit::MILLI);
break;
- case LogicalType::TIME_MICROS:
- *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
- break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
@@ -135,6 +136,9 @@
case LogicalType::NONE:
*out = ::arrow::int64();
break;
+ case LogicalType::INT_64:
+ *out = ::arrow::int64();
+ break;
case LogicalType::UINT_64:
*out = ::arrow::uint64();
break;
@@ -144,6 +148,12 @@
case LogicalType::TIMESTAMP_MILLIS:
*out = TIMESTAMP_MS;
break;
+ case LogicalType::TIMESTAMP_MICROS:
+ *out = TIMESTAMP_US;
+ break;
+ case LogicalType::TIME_MICROS:
+ *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
+ break;
default:
std::stringstream ss;
ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
@@ -455,21 +465,30 @@
break;
case ArrowType::TIMESTAMP: {
auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get());
- if (timestamp_type->unit() != ::arrow::TimestampType::Unit::MILLI) {
- return Status::NotImplemented(
- "Other timestamp units than millisecond are not yet support with parquet.");
- }
+ auto unit = timestamp_type->unit();
type = ParquetType::INT64;
- logical_type = LogicalType::TIMESTAMP_MILLIS;
+ if (unit == ::arrow::TimeUnit::MILLI) {
+ logical_type = LogicalType::TIMESTAMP_MILLIS;
+ } else if (unit == ::arrow::TimeUnit::MICRO) {
+ logical_type = LogicalType::TIMESTAMP_MICROS;
+ } else {
+ return Status::NotImplemented(
+ "Only MILLI and MICRO units supported for Arrow timestamps with Parquet.");
+ }
} break;
case ArrowType::TIME32:
- type = ParquetType::INT64;
+ type = ParquetType::INT32;
logical_type = LogicalType::TIME_MILLIS;
break;
- case ArrowType::TIME64:
+ case ArrowType::TIME64: {
+ auto time_type = static_cast<::arrow::Time64Type*>(field->type().get());
+ if (time_type->unit() == ::arrow::TimeUnit::NANO) {
+ return Status::NotImplemented(
+ "Nanosecond time not supported in Parquet.");
+ }
type = ParquetType::INT64;
logical_type = LogicalType::TIME_MICROS;
- break;
+ } break;
case ArrowType::STRUCT: {
auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
return StructToNode(struct_type, field->name(), field->nullable(), properties, out);
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index bff952b..8bcd314 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -20,6 +20,7 @@
#include "arrow/api.h"
#include "arrow/test-util.h"
+#include "arrow/type_traits.h"
namespace parquet {
namespace arrow {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 90cd135..6ac33b1 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -26,6 +26,7 @@
#include "parquet/arrow/schema.h"
#include "arrow/api.h"
+#include "arrow/visitor_inline.h"
using arrow::Array;
using arrow::BinaryArray;
@@ -39,6 +40,7 @@
using arrow::ListArray;
using arrow::Status;
using arrow::Table;
+using arrow::TimeUnit;
using parquet::ParquetFileWriter;
using parquet::ParquetVersion;
@@ -49,44 +51,34 @@
namespace BitUtil = ::arrow::BitUtil;
-class LevelBuilder : public ::arrow::ArrayVisitor {
+class LevelBuilder {
public:
explicit LevelBuilder(MemoryPool* pool)
: def_levels_(pool, ::arrow::int16()), rep_levels_(pool, ::arrow::int16()) {
def_levels_buffer_ = std::make_shared<PoolBuffer>(pool);
}
-#define PRIMITIVE_VISIT(ArrowTypePrefix) \
- Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \
- array_offsets_.push_back(array.offset()); \
- valid_bitmaps_.push_back(array.null_bitmap_data()); \
- null_counts_.push_back(array.null_count()); \
- values_type_ = array.type_id(); \
- values_array_ = &array; \
- return Status::OK(); \
+ Status VisitInline(const Array& array);
+
+ Status Visit(const ::arrow::PrimitiveArray& array) {
+ array_offsets_.push_back(array.offset());
+ valid_bitmaps_.push_back(array.null_bitmap_data());
+ null_counts_.push_back(array.null_count());
+ values_type_ = array.type_id();
+ values_array_ = &array;
+ return Status::OK();
}
- PRIMITIVE_VISIT(Boolean)
- PRIMITIVE_VISIT(Int8)
- PRIMITIVE_VISIT(Int16)
- PRIMITIVE_VISIT(Int32)
- PRIMITIVE_VISIT(Int64)
- PRIMITIVE_VISIT(UInt8)
- PRIMITIVE_VISIT(UInt16)
- PRIMITIVE_VISIT(UInt32)
- PRIMITIVE_VISIT(UInt64)
- PRIMITIVE_VISIT(HalfFloat)
- PRIMITIVE_VISIT(Float)
- PRIMITIVE_VISIT(Double)
- PRIMITIVE_VISIT(String)
- PRIMITIVE_VISIT(Binary)
- PRIMITIVE_VISIT(Date64)
- PRIMITIVE_VISIT(Time32)
- PRIMITIVE_VISIT(Time64)
- PRIMITIVE_VISIT(Timestamp)
- PRIMITIVE_VISIT(Interval)
+ Status Visit(const ::arrow::BinaryArray& array) {
+ array_offsets_.push_back(array.offset());
+ valid_bitmaps_.push_back(array.null_bitmap_data());
+ null_counts_.push_back(array.null_count());
+ values_type_ = array.type_id();
+ values_array_ = &array;
+ return Status::OK();
+ }
- Status Visit(const ListArray& array) override {
+ Status Visit(const ListArray& array) {
array_offsets_.push_back(array.offset());
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
@@ -95,20 +87,21 @@
min_offset_idx_ = array.value_offset(min_offset_idx_);
max_offset_idx_ = array.value_offset(max_offset_idx_);
- return array.values()->Accept(this);
+ return VisitInline(*array.values());
}
-#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \
- Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \
- return Status::NotImplemented( \
- "Level generation for ArrowTypePrefix not supported yet"); \
- };
+#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \
+ Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \
+ return Status::NotImplemented( \
+ "Level generation for ArrowTypePrefix not supported yet"); \
+ }
- NOT_IMPLEMENTED_VIST(Null)
- NOT_IMPLEMENTED_VIST(Struct)
- NOT_IMPLEMENTED_VIST(Union)
- NOT_IMPLEMENTED_VIST(Decimal)
- NOT_IMPLEMENTED_VIST(Dictionary)
+ NOT_IMPLEMENTED_VISIT(Null)
+ NOT_IMPLEMENTED_VISIT(Struct)
+ NOT_IMPLEMENTED_VISIT(Union)
+ NOT_IMPLEMENTED_VISIT(Decimal)
+ NOT_IMPLEMENTED_VISIT(Dictionary)
+ NOT_IMPLEMENTED_VISIT(Interval)
Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values,
@@ -117,7 +110,7 @@
// Work downwards to extract bitmaps and offsets
min_offset_idx_ = 0;
max_offset_idx_ = array.length();
- RETURN_NOT_OK(array.Accept(this));
+ RETURN_NOT_OK(VisitInline(array));
*num_values = max_offset_idx_ - min_offset_idx_;
*values_offset = min_offset_idx_;
*values_type = values_type_;
@@ -247,6 +240,10 @@
const Array* values_array_;
};
+Status LevelBuilder::VisitInline(const Array& array) {
+ return VisitArrayInline(array, this);
+}
+
class FileWriter::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
@@ -257,14 +254,15 @@
int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
template <typename ParquetType, typename ArrowType>
- Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
- int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
+ Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
const typename ArrowType::c_type* data_ptr);
template <typename ParquetType, typename ArrowType>
- Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
- int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
- const uint8_t* valid_bits, int64_t valid_bits_offset,
+ Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, const ArrowType& type,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
const typename ArrowType::c_type* data_ptr);
Status WriteColumnChunk(const Array& data);
@@ -307,13 +305,14 @@
if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
// no nulls, just dump the data
- RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer, array->length(),
- num_levels, def_levels, rep_levels, data_ptr + data->offset())));
+ RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer,
+ static_cast<const ArrowType&>(*array->type()), array->length(), num_levels,
+ def_levels, rep_levels, data_ptr + data->offset())));
} else {
const uint8_t* valid_bits = data->null_bitmap_data();
- RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, data->length(),
- num_levels, def_levels, rep_levels, valid_bits, data->offset(),
- data_ptr + data->offset())));
+ RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer,
+ static_cast<const ArrowType&>(*array->type()), data->length(), num_levels,
+ def_levels, rep_levels, valid_bits, data->offset(), data_ptr + data->offset())));
}
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
@@ -321,8 +320,9 @@
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) {
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
+ const typename ArrowType::c_type* data_ptr) {
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
@@ -334,8 +334,9 @@
template <>
Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
- TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) {
+ TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const int64_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
for (int i = 0; i < num_values; i++) {
@@ -346,27 +347,49 @@
return Status::OK();
}
-#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
- template <> \
- Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \
- TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \
- const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) { \
- PARQUET_CATCH_NOT_OK( \
- writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
- return Status::OK(); \
+template <>
+Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
+ TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const int32_t* data_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+ auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+ if (type.unit() == TimeUnit::SECOND) {
+ for (int i = 0; i < num_values; i++) {
+ buffer_ptr[i] = data_ptr[i] * 1000;
+ }
+ } else {
+ std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+ return Status::OK();
+}
+
+#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
+ template <> \
+ Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \
+ TypedColumnWriter<ParquetType> * writer, const ArrowType& type, \
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels, \
+ const int16_t* rep_levels, const CType* data_ptr) { \
+ PARQUET_CATCH_NOT_OK( \
+ writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
+ return Status::OK(); \
}
NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const typename ArrowType::c_type* data_ptr) {
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr) {
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
@@ -386,9 +409,10 @@
template <>
Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
- TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const int64_t* data_ptr) {
+ TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ const int64_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
INIT_BITSET(valid_bits, valid_bits_offset);
@@ -405,20 +429,54 @@
return Status::OK();
}
+template <>
+Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
+ TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ const int32_t* data_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+ auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+ INIT_BITSET(valid_bits, valid_bits_offset);
+
+ if (type.unit() == TimeUnit::SECOND) {
+ for (int i = 0; i < num_values; i++) {
+ if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+ buffer_ptr[i] = data_ptr[i] * 1000;
+ }
+ READ_NEXT_BITSET(valid_bits);
+ }
+ } else {
+ for (int i = 0; i < num_values; i++) {
+ if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+ buffer_ptr[i] = data_ptr[i];
+ }
+ READ_NEXT_BITSET(valid_bits);
+ }
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
+ num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+
+ return Status::OK();
+}
+
#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
template <> \
Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \
- TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
- int64_t valid_bits_offset, const CType* data_ptr) { \
+ TypedColumnWriter<ParquetType> * writer, const ArrowType& type, \
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels, \
+ const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \
+ const CType* data_ptr) { \
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( \
num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
return Status::OK(); \
}
NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
@@ -553,14 +611,17 @@
WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
- WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
- WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
+ WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
+ WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
+ WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
+ WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
+ WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
default:
std::stringstream ss;
ss << "Data type not supported as list value: " << values_array->type()->ToString();