PARQUET-1092: Support writing chunked arrow::Table columns
I did quite a bit of refactoring to make this easier / simpler. I think there's some additional work we could do to make the write-path code cleaner, but we should probably wait to do some of that until we implement complete nested data write and read.
I will follow up shortly with some functional tests for ARROW-232 to make sure this works end-to-end on the pyarrow side.
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #426 from wesm/PARQUET-1092 and squashes the following commits:
7aa3fab [Wes McKinney] Fix compiler warnings
a72ad12 [Wes McKinney] Test and fix bugs with chunked writes. Ensure that primitive types use right transfer functor on read path
c5006cb [Wes McKinney] Fix and expand date64->date32 conversion unit test
61e364c [Wes McKinney] Compiling again with test failure
c7e6696 [Wes McKinney] More refactoring, chunked writes
9f35818 [Wes McKinney] More refactoring
3e297cc [Wes McKinney] Start refactoring, totally broken
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 02f3751..db12fb4 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -24,7 +24,10 @@
#include "gtest/gtest.h"
#include <arrow/compute/api.h>
+#include <cstdint>
+#include <functional>
#include <sstream>
+#include <vector>
#include "parquet/api/reader.h"
#include "parquet/api/writer.h"
@@ -38,6 +41,7 @@
#include "arrow/api.h"
#include "arrow/test-util.h"
+#include "arrow/type_traits.h"
#include "arrow/util/decimal.h"
using arrow::Array;
@@ -45,6 +49,7 @@
using arrow::Buffer;
using arrow::ChunkedArray;
using arrow::Column;
+using arrow::DataType;
using arrow::ListArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
@@ -77,7 +82,7 @@
static constexpr uint32_t kDefaultSeed = 0;
-LogicalType::type get_logical_type(const ::arrow::DataType& type) {
+LogicalType::type get_logical_type(const ::DataType& type) {
switch (type.id()) {
case ArrowId::UINT8:
return LogicalType::UINT_8;
@@ -130,7 +135,7 @@
return LogicalType::NONE;
}
-ParquetType::type get_physical_type(const ::arrow::DataType& type) {
+ParquetType::type get_physical_type(const ::DataType& type) {
switch (type.id()) {
case ArrowId::BOOL:
return ParquetType::BOOLEAN;
@@ -325,74 +330,6 @@
*out = sink->GetBuffer();
}
-void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
- int64_t row_group_size, const std::vector<int>& column_subset,
- std::shared_ptr<Table>* out,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
- default_arrow_writer_properties()) {
- std::shared_ptr<Buffer> buffer;
- WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer);
-
- std::unique_ptr<FileReader> reader;
- ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
- ::arrow::default_memory_pool(),
- ::parquet::default_reader_properties(), nullptr, &reader));
-
- reader->set_num_threads(num_threads);
-
- if (column_subset.size() > 0) {
- ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
- } else {
- // Read everything
- ASSERT_OK_NO_THROW(reader->ReadTable(out));
- }
-}
-
-static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type,
- Repetition::type repetition) {
- int32_t byte_width = -1;
- int32_t precision = -1;
- int32_t scale = -1;
-
- switch (type.id()) {
- case ::arrow::Type::DICTIONARY: {
- const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
- const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
- switch (values_type.id()) {
- case ::arrow::Type::FIXED_SIZE_BINARY:
- byte_width =
- static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
- break;
- case ::arrow::Type::DECIMAL: {
- const auto& decimal_type =
- static_cast<const ::arrow::Decimal128Type&>(values_type);
- precision = decimal_type.precision();
- scale = decimal_type.scale();
- byte_width = DecimalSize(precision);
- } break;
- default:
- break;
- }
- } break;
- case ::arrow::Type::FIXED_SIZE_BINARY:
- byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
- break;
- case ::arrow::Type::DECIMAL: {
- const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
- precision = decimal_type.precision();
- scale = decimal_type.scale();
- byte_width = DecimalSize(precision);
- } break;
- default:
- break;
- }
- auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
- get_logical_type(type), byte_width, precision, scale);
- NodePtr node_ =
- GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
- return std::static_pointer_cast<GroupNode>(node_);
-}
-
namespace internal {
void AssertArraysEqual(const Array& expected, const Array& actual) {
@@ -427,13 +364,113 @@
}
}
-void AssertTablesEqual(const Table& expected, const Table& actual) {
+void PrintColumn(const Column& col, std::stringstream* ss) {
+ const ChunkedArray& carr = *col.data();
+ for (int i = 0; i < carr.num_chunks(); ++i) {
+ auto c1 = carr.chunk(i);
+ *ss << "Chunk " << i << std::endl;
+ EXPECT_OK(::arrow::PrettyPrint(*c1, 0, ss));
+ *ss << std::endl;
+ }
+}
+
+void AssertTablesEqual(const Table& expected, const Table& actual,
+ bool same_chunk_layout = true) {
ASSERT_EQ(expected.num_columns(), actual.num_columns());
- for (int i = 0; i < actual.num_columns(); ++i) {
- AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+ if (same_chunk_layout) {
+ for (int i = 0; i < actual.num_columns(); ++i) {
+ AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+ }
+ } else {
+ std::stringstream ss;
+ if (!actual.Equals(expected)) {
+ for (int i = 0; i < expected.num_columns(); ++i) {
+ ss << "Actual column " << i << std::endl;
+ PrintColumn(*actual.column(i), &ss);
+
+ ss << "Expected column " << i << std::endl;
+ PrintColumn(*expected.column(i), &ss);
+ }
+ FAIL() << ss.str();
+ }
}
- ASSERT_TRUE(actual.Equals(expected));
+}
+
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+ int64_t row_group_size, const std::vector<int>& column_subset,
+ std::shared_ptr<Table>* out,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties()) {
+ std::shared_ptr<Buffer> buffer;
+ WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer);
+
+ std::unique_ptr<FileReader> reader;
+ ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+ ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ reader->set_num_threads(num_threads);
+
+ if (column_subset.size() > 0) {
+ ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
+ } else {
+ // Read everything
+ ASSERT_OK_NO_THROW(reader->ReadTable(out));
+ }
+}
+
+void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties()) {
+ std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties);
+ AssertTablesEqual(*table, *result, false);
+}
+
+static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
+ Repetition::type repetition) {
+ int32_t byte_width = -1;
+ int32_t precision = -1;
+ int32_t scale = -1;
+
+ switch (type.id()) {
+ case ::arrow::Type::DICTIONARY: {
+ const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
+ const ::DataType& values_type = *dict_type.dictionary()->type();
+ switch (values_type.id()) {
+ case ::arrow::Type::FIXED_SIZE_BINARY:
+ byte_width =
+ static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
+ break;
+ case ::arrow::Type::DECIMAL: {
+ const auto& decimal_type =
+ static_cast<const ::arrow::Decimal128Type&>(values_type);
+ precision = decimal_type.precision();
+ scale = decimal_type.scale();
+ byte_width = DecimalSize(precision);
+ } break;
+ default:
+ break;
+ }
+ } break;
+ case ::arrow::Type::FIXED_SIZE_BINARY:
+ byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
+ break;
+ case ::arrow::Type::DECIMAL: {
+ const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
+ precision = decimal_type.precision();
+ scale = decimal_type.scale();
+ byte_width = DecimalSize(precision);
+ } break;
+ default:
+ break;
+ }
+ auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
+ get_logical_type(type), byte_width, precision, scale);
+ NodePtr node_ =
+ GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+ return std::static_pointer_cast<GroupNode>(node_);
}
template <typename TestType>
@@ -527,10 +564,7 @@
}
void CheckRoundTrip(const std::shared_ptr<Table>& table) {
- std::shared_ptr<Table> result;
- DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
-
- AssertTablesEqual(*table, *result);
+ CheckSimpleRoundtrip(table, table->num_rows());
}
template <typename ArrayType>
@@ -1315,32 +1349,48 @@
auto f0 = field("f0", ::arrow::date64());
auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND));
- std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1}));
+ auto f2 = field("f2", ::arrow::date64());
+ auto f3 = field("f3", ::arrow::time32(TimeUnit::SECOND));
+
+ auto schema = ::arrow::schema({f0, f1, f2, f3});
std::vector<int64_t> a0_values = {1489190400000, 1489276800000, 1489363200000,
1489449600000, 1489536000000, 1489622400000};
std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5};
- std::shared_ptr<Array> a0, a1, x0, x1;
+ std::shared_ptr<Array> a0, a1, a0_nonnull, a1_nonnull, x0, x1, x0_nonnull, x1_nonnull;
+
ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0);
+ ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), a0_values, &a0_nonnull);
+
ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1);
+ ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), a1_values, &a1_nonnull);
std::vector<std::shared_ptr<::arrow::Column>> columns = {
- std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1)};
+ std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
+ std::make_shared<Column>("f2", a0_nonnull),
+ std::make_shared<Column>("f3", a1_nonnull)};
auto table = Table::Make(schema, columns);
// Expected schema and values
auto e0 = field("f0", ::arrow::date32());
auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI));
- std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({e0, e1}));
+ auto e2 = field("f2", ::arrow::date32());
+ auto e3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
+ auto ex_schema = ::arrow::schema({e0, e1, e2, e3});
std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241};
std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000};
ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0);
+ ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), x0_values, &x0_nonnull);
+
ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1);
+ ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), x1_values, &x1_nonnull);
std::vector<std::shared_ptr<::arrow::Column>> ex_columns = {
- std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1)};
+ std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1),
+ std::make_shared<Column>("f2", x0_nonnull),
+ std::make_shared<Column>("f3", x1_nonnull)};
auto ex_table = Table::Make(ex_schema, ex_columns);
std::shared_ptr<Table> result;
@@ -1375,6 +1425,43 @@
*out = Table::Make(schema, columns);
}
+void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
+ std::shared_ptr<Array>* out_array) {
+ ::arrow::Int32Builder offset_builder;
+
+ std::vector<int32_t> length_draws;
+ randint(num_rows, 0, 100, &length_draws);
+
+ std::vector<int32_t> offset_values;
+
+ // Make sure some of them are length 0
+ int32_t total_elements = 0;
+ for (size_t i = 0; i < length_draws.size(); ++i) {
+ if (length_draws[i] < 10) {
+ length_draws[i] = 0;
+ }
+ offset_values.push_back(total_elements);
+ total_elements += length_draws[i];
+ }
+ offset_values.push_back(total_elements);
+
+ std::vector<int8_t> value_draws;
+ randint(total_elements, 0, 100, &value_draws);
+
+ std::vector<bool> is_valid;
+ random_is_valid(total_elements, 0.1, &is_valid);
+
+ std::shared_ptr<Array> values, offsets;
+ ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
+ value_draws, &values);
+ ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
+
+ ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
+ out_array));
+
+ *out_type = ::arrow::list(::arrow::int8());
+}
+
TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
@@ -1464,51 +1551,16 @@
AssertTablesEqual(*expected, *result);
}
-void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
- ::arrow::Int32Builder offset_builder;
-
- std::vector<int32_t> length_draws;
- randint(num_rows, 0, 100, &length_draws);
-
- std::vector<int32_t> offset_values;
-
- // Make sure some of them are length 0
- int32_t total_elements = 0;
- for (size_t i = 0; i < length_draws.size(); ++i) {
- if (length_draws[i] < 10) {
- length_draws[i] = 0;
- }
- offset_values.push_back(total_elements);
- total_elements += length_draws[i];
- }
- offset_values.push_back(total_elements);
-
- std::vector<int8_t> value_draws;
- randint(total_elements, 0, 100, &value_draws);
-
- std::vector<bool> is_valid;
- random_is_valid(total_elements, 0.1, &is_valid);
-
- std::shared_ptr<Array> values, offsets;
- ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
- value_draws, &values);
- ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
-
- std::shared_ptr<Array> list_array;
- ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
- &list_array));
-
- auto f1 = ::arrow::field("a", ::arrow::list(::arrow::int8()));
- auto schema = ::arrow::schema({f1});
- std::vector<std::shared_ptr<Array>> arrays = {list_array};
- *out = Table::Make(schema, arrays);
-}
-
TEST(TestArrowReadWrite, ListLargeRecords) {
const int num_rows = 50;
- std::shared_ptr<Table> table;
- MakeListTable(num_rows, &table);
+ std::shared_ptr<Array> list_array;
+ std::shared_ptr<::DataType> list_type;
+
+ MakeListArray(num_rows, &list_type, &list_array);
+
+ auto schema = ::arrow::schema({::arrow::field("a", list_type)});
+ std::shared_ptr<Table> table = Table::Make(schema, {list_array});
std::shared_ptr<Buffer> buffer;
WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer);
@@ -1549,6 +1601,74 @@
ASSERT_TRUE(table->Equals(*chunked_table));
}
+typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
+ ArrayFactory;
+
+template <typename ArrowType>
+struct GenerateArrayFunctor {
+ explicit GenerateArrayFunctor(double pct_null = 0.1) : pct_null(pct_null) {}
+
+ void operator()(int length, std::shared_ptr<::DataType>* type,
+ std::shared_ptr<Array>* array) {
+ using T = typename ArrowType::c_type;
+
+ // TODO(wesm): generate things other than integers
+ std::vector<T> draws;
+ randint(length, 0, 100, &draws);
+
+ std::vector<bool> is_valid;
+ random_is_valid(length, this->pct_null, &is_valid);
+
+ *type = ::arrow::TypeTraits<ArrowType>::type_singleton();
+ ::arrow::ArrayFromVector<ArrowType, T>(*type, is_valid, draws, array);
+ }
+
+ double pct_null;
+};
+
+typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
+ ArrayFactory;
+
+auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
+ std::shared_ptr<Array>* array) {
+ GenerateArrayFunctor<::arrow::Int32Type> func;
+ func(length, type, array);
+};
+
+auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
+ std::shared_ptr<Array>* array) {
+ MakeListArray(length, type, array);
+};
+
+TEST(TestArrowReadWrite, TableWithChunkedColumns) {
+ std::vector<ArrayFactory> functions = {GenerateInt32, GenerateList};
+
+ std::vector<int> chunk_sizes = {2, 4, 10, 2};
+ const int64_t total_length = 18;
+
+ for (const auto& datagen_func : functions) {
+ ::arrow::ArrayVector arrays;
+ std::shared_ptr<Array> arr;
+ std::shared_ptr<::DataType> type;
+ datagen_func(total_length, &type, &arr);
+
+ int64_t offset = 0;
+ for (int chunk_size : chunk_sizes) {
+ arrays.push_back(arr->Slice(offset, chunk_size));
+ offset += chunk_size;
+ }
+
+ auto field = ::arrow::field("fname", type);
+ auto schema = ::arrow::schema({field});
+ auto col = std::make_shared<::arrow::Column>(field, arrays);
+ auto table = Table::Make(schema, {col});
+
+ CheckSimpleRoundtrip(table, 2);
+ CheckSimpleRoundtrip(table, 3);
+ CheckSimpleRoundtrip(table, 10);
+ }
+}
+
TEST(TestArrowWrite, CheckChunkSize) {
const int num_columns = 2;
const int num_rows = 128;
@@ -1943,13 +2063,13 @@
class TestArrowReaderAdHocSpark
: public ::testing::TestWithParam<
- std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};
+ std::tuple<std::string, std::shared_ptr<::DataType>>> {};
TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
std::string path(std::getenv("PARQUET_TEST_DATA"));
std::string filename;
- std::shared_ptr<::arrow::DataType> decimal_type;
+ std::shared_ptr<::DataType> decimal_type;
std::tie(filename, decimal_type) = GetParam();
path += "/" + filename;
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 771b996..b33eda1 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -62,8 +62,8 @@
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
- EXPECT_TRUE(lhs->Equals(rhs))
- << i << " " << lhs->ToString() << " != " << rhs->ToString();
+ EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+ << " != " << rhs->ToString();
}
}
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 53065a6..9318305 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -717,8 +717,7 @@
template <typename ArrowType, typename ParquetType>
using supports_fast_path =
- typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value,
- ParquetType>::type;
+ typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
template <typename ArrowType, typename ParquetType, typename Enable = void>
struct TransferFunctor {
@@ -728,6 +727,10 @@
Status operator()(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<::arrow::DataType>& type,
std::shared_ptr<Array>* out) {
+ static_assert(!std::is_same<ArrowType, ::arrow::Int32Type>::value,
+ "The fast path transfer functor should be used "
+ "for primitive values");
+
int64_t length = reader->values_written();
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index d7001aa..85d5bd3 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -31,6 +31,7 @@
using arrow::Array;
using arrow::BinaryArray;
+using arrow::ChunkedArray;
using arrow::FixedSizeBinaryArray;
using arrow::Decimal128Array;
using arrow::BooleanArray;
@@ -65,12 +66,12 @@
return default_writer_properties;
}
+namespace {
+
class LevelBuilder {
public:
explicit LevelBuilder(MemoryPool* pool)
- : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {
- def_levels_buffer_ = std::make_shared<PoolBuffer>(pool);
- }
+ : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {}
Status VisitInline(const Array& array);
@@ -80,7 +81,6 @@
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
- values_type_ = array.type_id();
values_array_ = std::make_shared<T>(array.data());
return Status::OK();
}
@@ -106,13 +106,12 @@
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)
NOT_IMPLEMENTED_VISIT(Dictionary)
- NOT_IMPLEMENTED_VISIT(Interval)
Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
- int64_t* values_offset, ::arrow::Type::type* values_type,
- int64_t* num_values, int64_t* num_levels,
- std::shared_ptr<Buffer>* def_levels,
- std::shared_ptr<Buffer>* rep_levels,
+ int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
+ const std::shared_ptr<PoolBuffer>& def_levels_scratch,
+ std::shared_ptr<Buffer>* def_levels_out,
+ std::shared_ptr<Buffer>* rep_levels_out,
std::shared_ptr<Array>* values_array) {
// Work downwards to extract bitmaps and offsets
min_offset_idx_ = 0;
@@ -120,7 +119,6 @@
RETURN_NOT_OK(VisitInline(array));
*num_values = max_offset_idx_ - min_offset_idx_;
*values_offset = min_offset_idx_;
- *values_type = values_type_;
*values_array = values_array_;
// Walk downwards to extract nullability
@@ -139,11 +137,12 @@
// Generate the levels.
if (nullable_.size() == 1) {
// We have a PrimitiveArray
- *rep_levels = nullptr;
+ *rep_levels_out = nullptr;
if (nullable_[0]) {
- RETURN_NOT_OK(def_levels_buffer_->Resize(array.length() * sizeof(int16_t)));
+ RETURN_NOT_OK(
+ def_levels_scratch->Resize(array.length() * sizeof(int16_t), false));
auto def_levels_ptr =
- reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
+ reinterpret_cast<int16_t*>(def_levels_scratch->mutable_data());
if (array.null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
} else if (array.null_count() == array.length()) {
@@ -152,17 +151,14 @@
::arrow::internal::BitmapReader valid_bits_reader(
array.null_bitmap_data(), array.offset(), array.length());
for (int i = 0; i < array.length(); i++) {
- if (valid_bits_reader.IsSet()) {
- def_levels_ptr[i] = 1;
- } else {
- def_levels_ptr[i] = 0;
- }
+ def_levels_ptr[i] = valid_bits_reader.IsSet() ? 1 : 0;
valid_bits_reader.Next();
}
}
- *def_levels = def_levels_buffer_;
+
+ *def_levels_out = def_levels_scratch;
} else {
- *def_levels = nullptr;
+ *def_levels_out = nullptr;
}
*num_levels = array.length();
} else {
@@ -170,12 +166,13 @@
RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length()));
std::shared_ptr<Array> def_levels_array;
- RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
- *def_levels = static_cast<PrimitiveArray*>(def_levels_array.get())->values();
-
std::shared_ptr<Array> rep_levels_array;
+
+ RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array));
- *rep_levels = static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
+
+ *def_levels_out = static_cast<PrimitiveArray*>(def_levels_array.get())->values();
+ *rep_levels_out = static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
*num_levels = rep_levels_array->length();
}
@@ -242,7 +239,6 @@
private:
Int16Builder def_levels_;
- std::shared_ptr<PoolBuffer> def_levels_buffer_;
Int16Builder rep_levels_;
std::vector<int64_t> null_counts_;
@@ -253,7 +249,6 @@
int64_t min_offset_idx_;
int64_t max_offset_idx_;
- ::arrow::Type::type values_type_;
std::shared_ptr<Array> values_array_;
};
@@ -261,164 +256,221 @@
return VisitArrayInline(array, this);
}
-class FileWriter::Impl {
+struct ColumnWriterContext {
+ ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
+ : memory_pool(memory_pool), properties(properties) {
+ this->data_buffer = std::make_shared<PoolBuffer>(memory_pool);
+ this->def_levels_buffer = std::make_shared<PoolBuffer>(memory_pool);
+ }
+
+ template <typename T>
+ Status GetScratchData(const int64_t num_values, T** out) {
+ RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
+ *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
+ return Status::OK();
+ }
+
+ MemoryPool* memory_pool;
+ ArrowWriterProperties* properties;
+
+ // Buffer used for storing the data of an array converted to the physical type
+ // as expected by parquet-cpp.
+ std::shared_ptr<PoolBuffer> data_buffer;
+
+ // We use the shared ownership of this buffer
+ std::shared_ptr<PoolBuffer> def_levels_buffer;
+};
+
+Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
+ if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
+ if (type.num_children() != 1) {
+ return Status::Invalid("Nested column branch had multiple children");
+ }
+ return GetLeafType(*type.child(0)->type(), leaf_type);
+ } else {
+ *leaf_type = type.id();
+ return Status::OK();
+ }
+}
+
+class ArrowColumnWriter {
public:
- Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
+ ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
+ const std::shared_ptr<Field>& field)
+ : ctx_(ctx), writer_(column_writer), field_(field) {}
- Status NewRowGroup(int64_t chunk_size);
+ Status Write(const Array& data);
+ Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
+ int64_t absolute_position = 0;
+ int chunk_index = 0;
+ int64_t chunk_offset = 0;
+ while (chunk_index < data.num_chunks() && absolute_position < offset) {
+ const int64_t chunk_length = data.chunk(chunk_index)->length();
+ if (absolute_position + chunk_length > offset) {
+ // Relative offset into the chunk to reach the desired start offset for
+ // writing
+ chunk_offset = offset - absolute_position;
+ break;
+ } else {
+ ++chunk_index;
+ absolute_position += chunk_length;
+ }
+ }
+
+ if (absolute_position >= data.length()) {
+ return Status::Invalid("Cannot write data at offset past end of chunked array");
+ }
+
+ int64_t values_written = 0;
+ while (values_written < size) {
+ const Array& chunk = *data.chunk(chunk_index);
+ const int64_t available_values = chunk.length() - chunk_offset;
+ const int64_t chunk_write_size = std::min(size - values_written, available_values);
+
+ // The chunk offset here will be 0 except for possibly the first chunk
+ // because of the advancing logic above
+ std::shared_ptr<Array> array_to_write = chunk.Slice(chunk_offset, chunk_write_size);
+ RETURN_NOT_OK(Write(*array_to_write));
+
+ if (chunk_write_size == available_values) {
+ chunk_offset = 0;
+ ++chunk_index;
+ }
+ values_written += chunk_write_size;
+ }
+
+ return Status::OK();
+ }
+
+ Status Close() {
+ PARQUET_CATCH_NOT_OK(writer_->Close());
+ return Status::OK();
+ }
+
+ private:
template <typename ParquetType, typename ArrowType>
- Status TypedWriteBatch(ColumnWriter* column_writer, const std::shared_ptr<Array>& data,
- int64_t num_levels, const int16_t* def_levels,
+ Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels);
- Status WriteTimestamps(ColumnWriter* column_writer, const std::shared_ptr<Array>& data,
- int64_t num_levels, const int16_t* def_levels,
+ Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels);
- Status WriteTimestampsCoerce(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& data, int64_t num_levels,
+ Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels);
template <typename ParquetType, typename ArrowType>
- Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
- const ArrowType& type, int64_t num_values,
+ Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels,
- const typename ArrowType::c_type* data_ptr);
+ const typename ArrowType::c_type* values);
template <typename ParquetType, typename ArrowType>
- Status WriteNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
- const ArrowType& type, int64_t num_values, int64_t num_levels,
+ Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels,
const uint8_t* valid_bits, int64_t valid_bits_offset,
- const typename ArrowType::c_type* data_ptr);
+ const typename ArrowType::c_type* values);
- Status WriteColumnChunk(const Array& data);
- Status Close();
+ template <typename ParquetType>
+ Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const typename ParquetType::c_type* values) {
+ auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ PARQUET_CATCH_NOT_OK(
+ typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
+ return Status::OK();
+ }
- const WriterProperties& properties() const { return *writer_->properties(); }
+ template <typename ParquetType>
+ Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ const typename ParquetType::c_type* values) {
+ auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
+ num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
+ return Status::OK();
+ }
- virtual ~Impl() {}
-
- private:
- friend class FileWriter;
-
- MemoryPool* pool_;
- // Buffer used for storing the data of an array converted to the physical type
- // as expected by parquet-cpp.
- PoolBuffer data_buffer_;
- std::unique_ptr<ParquetFileWriter> writer_;
- RowGroupWriter* row_group_writer_;
- std::shared_ptr<ArrowWriterProperties> arrow_properties_;
- bool closed_;
+ ColumnWriterContext* ctx_;
+ ColumnWriter* writer_;
+ std::shared_ptr<Field> field_;
};
-FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
- : pool_(pool),
- data_buffer_(pool),
- writer_(std::move(writer)),
- row_group_writer_(nullptr),
- arrow_properties_(arrow_properties),
- closed_(false) {}
-
-Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
- if (row_group_writer_ != nullptr) {
- PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
- }
- PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Column type specialization
-
template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& array,
- int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
+Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels,
+ const int16_t* def_levels,
+ const int16_t* rep_levels) {
using ArrowCType = typename ArrowType::c_type;
- const auto& data = static_cast<const PrimitiveArray&>(*array);
- auto data_ptr =
+ const auto& data = static_cast<const PrimitiveArray&>(array);
+ auto values =
reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset();
- auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
- if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
+ if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
// no nulls, just dump the data
RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
- writer, static_cast<const ArrowType&>(*array->type()), array->length(),
- num_levels, def_levels, rep_levels, data_ptr)));
+ static_cast<const ArrowType&>(*array.type()), array.length(), num_levels,
+ def_levels, rep_levels, values)));
} else {
const uint8_t* valid_bits = data.null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
- writer, static_cast<const ArrowType&>(*array->type()), data.length(), num_levels,
- def_levels, rep_levels, valid_bits, data.offset(), data_ptr)));
+ static_cast<const ArrowType&>(*array.type()), data.length(), num_levels,
+ def_levels, rep_levels, valid_bits, data.offset(), values)));
}
- PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::WriteNonNullableBatch(
- TypedColumnWriter<ParquetType>* writer, const ArrowType& type, int64_t num_values,
- int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
- const typename ArrowType::c_type* data_ptr) {
+Status ArrowColumnWriter::WriteNonNullableBatch(
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
+ const typename ArrowType::c_type* values) {
using ParquetCType = typename ParquetType::c_type;
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
- auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
- std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+ ParquetCType* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
+
+ std::copy(values, values + num_values, buffer);
+
+ return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
+ const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
+
for (int i = 0; i < num_values; i++) {
- buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
+ buffer[i] = static_cast<int32_t>(values[i] / 86400000);
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+
+ return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const int32_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
+ const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
if (type.unit() == TimeUnit::SECOND) {
for (int i = 0; i < num_values; i++) {
- buffer_ptr[i] = data_ptr[i] * 1000;
+ buffer[i] = values[i] * 1000;
}
} else {
- std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
+ std::copy(values, values + num_values, buffer);
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+ return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
}
-#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
- template <> \
- Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \
- TypedColumnWriter<ParquetType> * writer, const ArrowType& type, \
- int64_t num_values, int64_t num_levels, const int16_t* def_levels, \
- const int16_t* rep_levels, const CType* data_ptr) { \
- PARQUET_CATCH_NOT_OK( \
- writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
- return Status::OK(); \
+#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
+ template <> \
+ Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>( \
+ const ArrowType& type, int64_t num_values, int64_t num_levels, \
+ const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \
+ return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer); \
}
NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
@@ -429,96 +481,68 @@
NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
- const ArrowType& type, int64_t num_values,
- int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels,
- const uint8_t* valid_bits,
- int64_t valid_bits_offset,
- const typename ArrowType::c_type* data_ptr) {
+Status ArrowColumnWriter::WriteNullableBatch(
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const typename ArrowType::c_type* values) {
using ParquetCType = typename ParquetType::c_type;
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
- auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
- num_values);
+ ParquetCType* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_levels, &buffer));
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]);
- }
- valid_bits_reader.Next();
+ buffer[i] = static_cast<ParquetCType>(values[i]);
}
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
- return Status::OK();
+ return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits,
+ valid_bits_offset, buffer);
}
template <>
-Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
- num_values);
- for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- // Convert from milliseconds into days since the epoch
- buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
- }
- valid_bits_reader.Next();
- }
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
+ const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const int64_t* values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
- return Status::OK();
+ for (int i = 0; i < num_values; i++) {
+ // Convert from milliseconds into days since the epoch
+ buffer[i] = static_cast<int32_t>(values[i] / 86400000);
+ }
+
+ return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
+ valid_bits_offset, buffer);
}
template <>
-Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const int32_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
- num_values);
+Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
+ const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const int32_t* values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
if (type.unit() == TimeUnit::SECOND) {
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- buffer_ptr[i] = data_ptr[i] * 1000;
- }
- valid_bits_reader.Next();
+ buffer[i] = values[i] * 1000;
}
} else {
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- buffer_ptr[i] = data_ptr[i];
- }
- valid_bits_reader.Next();
+ buffer[i] = values[i];
}
}
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
-
- return Status::OK();
+ return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
+ valid_bits_offset, buffer);
}
-#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
- template <> \
- Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \
- TypedColumnWriter<ParquetType> * writer, const ArrowType& type, \
- int64_t num_values, int64_t num_levels, const int16_t* def_levels, \
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \
- const CType* data_ptr) { \
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( \
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
- return Status::OK(); \
+#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
+ template <> \
+ Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>( \
+ const ArrowType& type, int64_t num_values, int64_t num_levels, \
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
+ int64_t valid_bits_offset, const CType* values) { \
+ return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, \
+ valid_bits_offset, values); \
}
NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
@@ -527,122 +551,99 @@
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-
-// ----------------------------------------------------------------------
-// Write timestamps
-
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
template <>
-Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
- TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
- auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
- num_values);
+Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
+ const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const int64_t* values) {
+ Int96* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
if (type.unit() == TimeUnit::NANO) {
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
- }
- valid_bits_reader.Next();
+ internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]);
}
} else {
return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
}
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
-
- return Status::OK();
+ return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
+ valid_bits_offset, buffer);
}
template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
- TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
- auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
+ const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
+ Int96* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
if (type.unit() == TimeUnit::NANO) {
for (int i = 0; i < num_values; i++) {
- internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+ internal::NanosecondsToImpalaTimestamp(values[i], buffer + i);
}
} else {
return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+ return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
}
-Status FileWriter::Impl::WriteTimestamps(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& values,
- int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
- const auto& type = static_cast<::arrow::TimestampType&>(*values->type());
+Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels,
+ const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ const auto& type = static_cast<const ::arrow::TimestampType&>(*values.type());
const bool is_nanosecond = type.unit() == TimeUnit::NANO;
- if (is_nanosecond && arrow_properties_->support_deprecated_int96_timestamps()) {
- return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
- column_writer, values, num_levels, def_levels, rep_levels);
+ if (is_nanosecond && ctx_->properties->support_deprecated_int96_timestamps()) {
+ return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
+ def_levels, rep_levels);
} else if (is_nanosecond ||
- (arrow_properties_->coerce_timestamps_enabled() &&
- (type.unit() != arrow_properties_->coerce_timestamps_unit()))) {
+ (ctx_->properties->coerce_timestamps_enabled() &&
+ (type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
// Casting is required. This covers several cases
// * Nanoseconds -> cast to microseconds
// * coerce_timestamps_enabled_, cast all timestamps to requested unit
- return WriteTimestampsCoerce(column_writer, values, num_levels, def_levels,
- rep_levels);
+ return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
} else {
// No casting of timestamps is required, take the fast path
- return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
- column_writer, values, num_levels, def_levels, rep_levels);
+ return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
+ def_levels, rep_levels);
}
}
-Status FileWriter::Impl::WriteTimestampsCoerce(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& array,
- int64_t num_levels,
- const int16_t* def_levels,
- const int16_t* rep_levels) {
- // Note that we can only use data_buffer_ here as we write timestamps with the fast
- // path.
- RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
- int64_t* data_buffer_ptr = reinterpret_cast<int64_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
+ const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ int64_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
- const auto& data = static_cast<const ::arrow::TimestampArray&>(*array);
+ const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
- auto data_ptr = data.raw_values();
- auto writer = reinterpret_cast<TypedColumnWriter<Int64Type>*>(column_writer);
+ auto values = data.raw_values();
+ const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
- const auto& type = static_cast<const ::arrow::TimestampType&>(*array->type());
-
- TimeUnit::type target_unit = arrow_properties_->coerce_timestamps_enabled()
- ? arrow_properties_->coerce_timestamps_unit()
+ TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
+ ? ctx_->properties->coerce_timestamps_unit()
: TimeUnit::MICRO;
auto target_type = ::arrow::timestamp(target_unit);
auto DivideBy = [&](const int64_t factor) {
- for (int64_t i = 0; i < array->length(); i++) {
- if (!data.IsNull(i) && (data_ptr[i] % factor != 0)) {
+ for (int64_t i = 0; i < array.length(); i++) {
+ if (!data.IsNull(i) && (values[i] % factor != 0)) {
std::stringstream ss;
ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
- << " would lose data: " << data_ptr[i];
+ << " would lose data: " << values[i];
return Status::Invalid(ss.str());
}
- data_buffer_ptr[i] = data_ptr[i] / factor;
+ buffer[i] = values[i] / factor;
}
return Status::OK();
};
auto MultiplyBy = [&](const int64_t factor) {
- for (int64_t i = 0; i < array->length(); i++) {
- data_buffer_ptr[i] = data_ptr[i] * factor;
+ for (int64_t i = 0; i < array.length(); i++) {
+ buffer[i] = values[i] * factor;
}
return Status::OK();
};
@@ -664,156 +665,140 @@
RETURN_NOT_OK(DivideBy(1000));
}
- if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
+ if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
// no nulls, just dump the data
RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
- writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
- num_levels, def_levels, rep_levels, data_buffer_ptr)));
+ static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
+ num_levels, def_levels, rep_levels, buffer)));
} else {
const uint8_t* valid_bits = data.null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
- writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
- num_levels, def_levels, rep_levels, valid_bits, data.offset(), data_buffer_ptr)));
+ static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
+ num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
}
- PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
-// ----------------------------------------------------------------------
-
// This specialization seems quite similar but it significantly differs in two points:
// * offset is added at the most latest time to the pointer as we have sub-byte access
// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
// ArrowType::c_type to ParquetType::c_type
template <>
-Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- RETURN_NOT_OK(data_buffer_.Resize(array->length()));
- auto data = static_cast<const BooleanArray*>(array.get());
- auto data_ptr = reinterpret_cast<const uint8_t*>(data->values()->data());
- auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
- auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer);
+Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ bool* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer));
+
+ const auto& data = static_cast<const BooleanArray&>(array);
+ auto values = reinterpret_cast<const uint8_t*>(data.values()->data());
int buffer_idx = 0;
- int64_t offset = array->offset();
- for (int i = 0; i < data->length(); i++) {
- if (!data->IsNull(i)) {
- buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i);
+ int64_t offset = array.offset();
+ for (int i = 0; i < data.length(); i++) {
+ if (!data.IsNull(i)) {
+ buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+
+ return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
-
- PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(ByteArray)));
- auto data = static_cast<const BinaryArray*>(array.get());
- auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ ByteArray* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer));
+
+ const auto& data = static_cast<const BinaryArray&>(array);
+
// In the case of an array consisting of only empty strings or all null,
- // data->data() points already to a nullptr, thus data->data()->data() will
+ // data.data() points already to a nullptr, thus data.data()->data() will
// segfault.
- const uint8_t* data_ptr = nullptr;
- if (data->value_data()) {
- data_ptr = reinterpret_cast<const uint8_t*>(data->value_data()->data());
- DCHECK(data_ptr != nullptr);
+ const uint8_t* values = nullptr;
+ if (data.value_data()) {
+ values = reinterpret_cast<const uint8_t*>(data.value_data()->data());
+ DCHECK(values != nullptr);
}
- auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
// Slice offset is accounted for in raw_value_offsets
- const int32_t* value_offset = data->raw_value_offsets();
+ const int32_t* value_offset = data.raw_value_offsets();
- if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+ if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
// no nulls, just dump the data
- for (int64_t i = 0; i < data->length(); i++) {
- buffer_ptr[i] =
- ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
+ for (int64_t i = 0; i < data.length(); i++) {
+ buffer[i] =
+ ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
}
} else {
int buffer_idx = 0;
- for (int64_t i = 0; i < data->length(); i++) {
- if (!data->IsNull(i)) {
- buffer_ptr[buffer_idx++] =
- ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
+ for (int64_t i = 0; i < data.length(); i++) {
+ if (!data.IsNull(i)) {
+ buffer[buffer_idx++] =
+ ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
}
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+
+ return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
- const auto& data = static_cast<const FixedSizeBinaryArray&>(*array);
+Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ const auto& data = static_cast<const FixedSizeBinaryArray&>(array);
const int64_t length = data.length();
- auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+ FLBA* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
- auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
-
- if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) {
+ if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) {
// no nulls, just dump the data
// todo(advancedxy): use a writeBatch to avoid this step
for (int64_t i = 0; i < length; i++) {
- buffer_ptr[i] = FixedLenByteArray(data.GetValue(i));
+ buffer[i] = FixedLenByteArray(data.GetValue(i));
}
} else {
int buffer_idx = 0;
for (int64_t i = 0; i < length; i++) {
if (!data.IsNull(i)) {
- buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
+ buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
}
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+
+ return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- const auto& data = static_cast<const Decimal128Array&>(*array);
+Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ const auto& data = static_cast<const Decimal128Array&>(array);
const int64_t length = data.length();
- // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
- std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
-
- RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false));
- auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
-
- auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+ FLBA* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
const int32_t offset =
decimal_type.byte_width() - DecimalSize(decimal_type.precision());
const bool does_not_have_nulls =
- writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+ writer_->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+ // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
+ std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
// TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
// don't have to keep writing two loops to handle the case where we know there are no
@@ -825,7 +810,7 @@
auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
- buffer_ptr[i] = FixedLenByteArray(
+ buffer[i] = FixedLenByteArray(
reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
}
} else {
@@ -834,77 +819,30 @@
auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
- buffer_ptr[buffer_idx++] = FixedLenByteArray(
+ buffer[buffer_idx++] = FixedLenByteArray(
reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
j += 2;
}
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+
+ return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
}
-// End of column type specializations
-// ----------------------------------------------------------------------
-
-Status FileWriter::Impl::Close() {
- if (!closed_) {
- // Make idempotent
- closed_ = true;
- if (row_group_writer_ != nullptr) {
- PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
- }
- PARQUET_CATCH_NOT_OK(writer_->Close());
- }
- return Status::OK();
-}
-
-Status FileWriter::NewRowGroup(int64_t chunk_size) {
- return impl_->NewRowGroup(chunk_size);
-}
-
-Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
- // DictionaryArrays are not yet handled with a fast path. To still support
- // writing them as a workaround, we convert them back to their non-dictionary
- // representation.
- if (data.type()->id() == ::arrow::Type::DICTIONARY) {
- const ::arrow::DictionaryType& dict_type =
- static_cast<const ::arrow::DictionaryType&>(*data.type());
-
- // TODO(ARROW-1648): Remove this special handling once we require an Arrow
- // version that has this fixed.
- if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
- return WriteColumnChunk(::arrow::NullArray(data.length()));
- }
-
- FunctionContext ctx(pool_);
- std::shared_ptr<Array> plain_array;
- RETURN_NOT_OK(
- Cast(&ctx, data, dict_type.dictionary()->type(), CastOptions(), &plain_array));
- return WriteColumnChunk(*plain_array);
- }
-
- ColumnWriter* column_writer;
- PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
-
- int current_column_idx = row_group_writer_->current_column();
- std::shared_ptr<::arrow::Schema> arrow_schema;
- RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
- writer_->key_value_metadata(), &arrow_schema));
- std::shared_ptr<Buffer> def_levels_buffer;
- std::shared_ptr<Buffer> rep_levels_buffer;
- int64_t values_offset;
+Status ArrowColumnWriter::Write(const Array& data) {
::arrow::Type::type values_type;
- int64_t num_levels;
- int64_t num_values;
+ RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
std::shared_ptr<Array> _values_array;
- LevelBuilder level_builder(pool_);
+ int64_t values_offset;
+ int64_t num_levels;
+ int64_t num_values;
+ LevelBuilder level_builder(ctx_->memory_pool);
+
+ std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
RETURN_NOT_OK(level_builder.GenerateLevels(
- data, arrow_schema->field(0), &values_offset, &values_type, &num_values,
- &num_levels, &def_levels_buffer, &rep_levels_buffer, &_values_array));
+ data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
+ &def_levels_buffer, &rep_levels_buffer, &_values_array));
const int16_t* def_levels = nullptr;
if (def_levels_buffer) {
def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
@@ -915,27 +853,26 @@
}
std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
-#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \
- case ::arrow::Type::ArrowEnum: \
- return TypedWriteBatch<ParquetType, ::arrow::ArrowType>( \
- column_writer, values_array, num_levels, def_levels, rep_levels);
+#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \
+ case ::arrow::Type::ArrowEnum: \
+ return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array, num_levels, \
+ def_levels, rep_levels);
switch (values_type) {
case ::arrow::Type::UINT32: {
if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
// Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
// to use the larger Int64Type to store them lossless.
- return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
- column_writer, values_array, num_levels, def_levels, rep_levels);
+ return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array, num_levels,
+ def_levels, rep_levels);
} else {
- return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
- column_writer, values_array, num_levels, def_levels, rep_levels);
+ return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array, num_levels,
+ def_levels, rep_levels);
}
}
WRITE_BATCH_CASE(NA, NullType, Int32Type)
case ::arrow::Type::TIMESTAMP:
- return WriteTimestamps(column_writer, values_array, num_levels, def_levels,
- rep_levels);
+ return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels);
WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
@@ -957,20 +894,130 @@
default:
break;
}
-
- PARQUET_CATCH_NOT_OK(column_writer->Close());
std::stringstream ss;
ss << "Data type not supported as list value: " << values_array->type()->ToString();
return Status::NotImplemented(ss.str());
}
-Status FileWriter::WriteColumnChunk(const ::arrow::Array& array) {
- return impl_->WriteColumnChunk(array);
+} // namespace
+
+// ----------------------------------------------------------------------
+// FileWriter implementation
+
+class FileWriter::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+ : writer_(std::move(writer)),
+ row_group_writer_(nullptr),
+ column_write_context_(pool, arrow_properties.get()),
+ arrow_properties_(arrow_properties),
+ closed_(false) {}
+
+ Status NewRowGroup(int64_t chunk_size) {
+ if (row_group_writer_ != nullptr) {
+ PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ }
+ PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
+ return Status::OK();
+ }
+
+ Status Close() {
+ if (!closed_) {
+ // Make idempotent
+ closed_ = true;
+ if (row_group_writer_ != nullptr) {
+ PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ }
+ PARQUET_CATCH_NOT_OK(writer_->Close());
+ }
+ return Status::OK();
+ }
+
+ Status WriteColumnChunk(const Array& data) {
+ // A bit awkward here since cannot instantiate ChunkedArray from const Array&
+ ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())};
+ auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks);
+ return WriteColumnChunk(chunked_array, 0, data.length());
+ }
+
+ Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
+ const int64_t size) {
+ // DictionaryArrays are not yet handled with a fast path. To still support
+ // writing them as a workaround, we convert them back to their non-dictionary
+ // representation.
+ if (data->type()->id() == ::arrow::Type::DICTIONARY) {
+ const ::arrow::DictionaryType& dict_type =
+ static_cast<const ::arrow::DictionaryType&>(*data->type());
+
+ // TODO(ARROW-1648): Remove this special handling once we require an Arrow
+ // version that has this fixed.
+ if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
+ auto null_array = std::make_shared<::arrow::NullArray>(data->length());
+ return WriteColumnChunk(*null_array);
+ }
+
+ FunctionContext ctx(this->memory_pool());
+ ::arrow::compute::Datum cast_input(data);
+ ::arrow::compute::Datum cast_output;
+ RETURN_NOT_OK(Cast(&ctx, cast_input, dict_type.dictionary()->type(), CastOptions(),
+ &cast_output));
+ return WriteColumnChunk(cast_output.chunked_array(), 0, data->length());
+ }
+
+ ColumnWriter* column_writer;
+ PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+
+ // TODO(wesm): This trick to construct a schema for one Parquet root node
+ // will not work for arbitrary nested data
+ int current_column_idx = row_group_writer_->current_column();
+ std::shared_ptr<::arrow::Schema> arrow_schema;
+ RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
+ writer_->key_value_metadata(), &arrow_schema));
+
+ ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
+ arrow_schema->field(0));
+
+ RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
+ return arrow_writer.Close();
+ }
+
+ const WriterProperties& properties() const { return *writer_->properties(); }
+
+ ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; }
+
+ virtual ~Impl() {}
+
+ private:
+ friend class FileWriter;
+
+ std::unique_ptr<ParquetFileWriter> writer_;
+ RowGroupWriter* row_group_writer_;
+ ColumnWriterContext column_write_context_;
+ std::shared_ptr<ArrowWriterProperties> arrow_properties_;
+ bool closed_;
+};
+
+Status FileWriter::NewRowGroup(int64_t chunk_size) {
+ return impl_->NewRowGroup(chunk_size);
+}
+
+Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) {
+ return impl_->WriteColumnChunk(data);
+}
+
+Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
+ const int64_t offset, const int64_t size) {
+ return impl_->WriteColumnChunk(data, offset, size);
+}
+
+Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) {
+ return WriteColumnChunk(data, 0, data->length());
}
Status FileWriter::Close() { return impl_->Close(); }
-MemoryPool* FileWriter::memory_pool() const { return impl_->pool_; }
+MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
FileWriter::~FileWriter() {}
@@ -1020,14 +1067,9 @@
return Open(schema, pool, wrapper, properties, arrow_properties, writer);
}
-Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
- // TODO(ARROW-232) Support writing chunked arrays.
- for (int i = 0; i < table.num_columns(); i++) {
- if (table.column(i)->data()->num_chunks() != 1) {
- return Status::NotImplemented("No support for writing chunked arrays yet.");
- }
- }
+namespace {} // namespace
+Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
if (chunk_size <= 0) {
return Status::Invalid("chunk size per row_group must be greater than 0");
} else if (chunk_size > impl_->properties().max_row_group_length()) {
@@ -1040,9 +1082,9 @@
RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
for (int i = 0; i < table.num_columns(); i++) {
- std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
- array = array->Slice(offset, size);
- RETURN_NOT_OK_ELSE(WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(Close()));
+ auto chunked_data = table.column(i)->data();
+ RETURN_NOT_OK_ELSE(WriteColumnChunk(chunked_data, offset, size),
+ PARQUET_IGNORE_NOT_OK(Close()));
}
}
return Status::OK();
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 24ba72d..a432850 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -133,15 +133,16 @@
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer);
- /**
- * Write a Table to Parquet.
- *
- * The table shall only consist of columns of primitive type or of primitive lists.
- */
+ /// \brief Write a Table to Parquet.
::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
::arrow::Status NewRowGroup(int64_t chunk_size);
::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
+
+ /// \brief Write ColumnChunk in row group using slice of a ChunkedArray
+ ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
+ const int64_t offset, const int64_t size);
+ ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data);
::arrow::Status Close();
virtual ~FileWriter();
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 7b8c775..6b84748 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -117,6 +117,8 @@
int64_t rows_written() const { return rows_written_; }
+ const WriterProperties* properties() { return properties_; }
+
protected:
virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index 7b74812..72c71c6 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -64,9 +64,9 @@
: contents_(std::move(contents)) {}
std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
- DCHECK(i < metadata()->num_columns())
- << "The RowGroup only has " << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+ << metadata()->num_columns()
+ << "columns, requested column: " << i;
const ColumnDescriptor* descr = metadata()->schema()->Column(i);
std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -76,9 +76,9 @@
}
std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
- DCHECK(i < metadata()->num_columns())
- << "The RowGroup only has " << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+ << metadata()->num_columns()
+ << "columns, requested column: " << i;
return contents_->GetColumnPageReader(i);
}
@@ -302,9 +302,9 @@
}
std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
- DCHECK(i < metadata()->num_row_groups())
- << "The file only has " << metadata()->num_row_groups()
- << "row groups, requested reader for: " << i;
+ DCHECK(i < metadata()->num_row_groups()) << "The file only has "
+ << metadata()->num_row_groups()
+ << "row groups, requested reader for: " << i;
return contents_->GetRowGroup(i);
}