PARQUET-1100: Introduce RecordReader interface to better support nested data, refactor parquet/arrow/reader
We did not have very consistent logic around reading values from leaf nodes versus reading semantic records where the repetition level is greater than zero. This introduces a reader class that reads from column chunks until it identifies the end of records. It also reads values (with spaces, if required by the schema) into internal buffers. This permitted a substantial refactoring and simplification of the code in parquet::arrow where we were handling the interpretation of batch reads as records manually.
As follow up patch, we should be able to take a collection of record readers from the same "tree" in a nested type and reassemble the intermediate Arrow structure and dealing with any redundant structure information in repetition and definition levels. This should a allow a unification of our nested data read code path so that we can read arbitrary nested structures.
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #398 from wesm/PARQUET-1100 and squashes the following commits:
9ea85d9 [Wes McKinney] Revert to const args
f4dc0fe [Wes McKinney] Make parquet::schema::Node non-copyable. Use const-refs instead of const-ptr for non-nullable argument
0d859cc [Wes McKinney] Code review comments, scrubbing some flakes
1368415 [Wes McKinney] Fix more MSVC warnings
eccb84c [Wes McKinney] Give macro more accurate name
0eaada0 [Wes McKinney] Use int64_t instead of int for batch sizes
79c3709 [Wes McKinney] Add documentation. Remove RecordReader from public API
8fa619b [Wes McKinney] Initialize memory in DecodeSpaced to avoid undefined behavior
5a0c860 [Wes McKinney] Remove non-repeated branch from DelimitRecords
c754e6e [Wes McKinney] Refactor to skip record delimiting for non-repeated data
ed2a03f [Wes McKinney] Move more code into TypedRecordReader
2e934e9 [Wes McKinney] Set some integers as const
58d3a0f [Wes McKinney] Do not index into levels arrays
b766371 [Wes McKinney] Add RecordReader::Reserve to preallocate, fixing perf regression. cpplint
1bf3e8f [Wes McKinney] Refactor to create stateful parquet::RecordReader class to better support nested data. Shift value buffering logic from parquet/arrow/reader into RecordReader. Fix bug described in PARQUET-1100
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1deab40..ca37b5f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -628,6 +628,7 @@
src/parquet/types.cc
src/parquet/arrow/reader.cc
+ src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index e899e10..a54fb5d 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -17,6 +17,8 @@
#include "benchmark/benchmark.h"
+#include <iostream>
+
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
#include "parquet/column_reader.h"
@@ -30,13 +32,14 @@
using arrow::BooleanBuilder;
using arrow::NumericBuilder;
-#define ABORT_NOT_OK(s) \
- do { \
- ::arrow::Status _s = (s); \
- if (ARROW_PREDICT_FALSE(!_s.ok())) { \
- exit(-1); \
- } \
- } while (0);
+#define EXIT_NOT_OK(s) \
+ do { \
+ ::arrow::Status _s = (s); \
+ if (ARROW_PREDICT_FALSE(!_s.ok())) { \
+ std::cout << "Exiting: " << _s.ToString() << std::endl; \
+ exit(EXIT_FAILURE); \
+ } \
+ } while (0)
namespace parquet {
@@ -101,12 +104,12 @@
std::vector<uint8_t> valid_bytes(BENCHMARK_SIZE, 0);
int n = {0};
std::generate(valid_bytes.begin(), valid_bytes.end(), [&n] { return n++ % 2; });
- ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
+ EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
} else {
- ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
+ EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
}
std::shared_ptr<::arrow::Array> array;
- ABORT_NOT_OK(builder.Finish(&array));
+ EXIT_NOT_OK(builder.Finish(&array));
auto field = ::arrow::field("column", type, nullable);
auto schema = std::make_shared<::arrow::Schema>(
@@ -125,12 +128,12 @@
int n = {0};
std::generate(valid_bytes.begin(), valid_bytes.end(),
[&n] { return (n++ % 2) != 0; });
- ABORT_NOT_OK(builder.Append(vec, valid_bytes));
+ EXIT_NOT_OK(builder.Append(vec, valid_bytes));
} else {
- ABORT_NOT_OK(builder.Append(vec));
+ EXIT_NOT_OK(builder.Append(vec));
}
std::shared_ptr<::arrow::Array> array;
- ABORT_NOT_OK(builder.Finish(&array));
+ EXIT_NOT_OK(builder.Finish(&array));
auto field = ::arrow::field("column", ::arrow::boolean(), nullable);
auto schema = std::make_shared<::arrow::Schema>(
@@ -148,7 +151,7 @@
while (state.KeepRunning()) {
auto output = std::make_shared<InMemoryOutputStream>();
- ABORT_NOT_OK(
+ EXIT_NOT_OK(
WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
}
SetBytesProcessed<nullable, ParquetType>(state);
@@ -171,8 +174,7 @@
std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
auto output = std::make_shared<InMemoryOutputStream>();
- ABORT_NOT_OK(
- WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
+ EXIT_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
std::shared_ptr<Buffer> buffer = output->GetBuffer();
while (state.KeepRunning()) {
@@ -180,7 +182,7 @@
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
std::shared_ptr<::arrow::Table> table;
- ABORT_NOT_OK(filereader.ReadTable(&table));
+ EXIT_NOT_OK(filereader.ReadTable(&table));
}
SetBytesProcessed<nullable, ParquetType>(state);
}
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a1e3382..56b4770 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -53,6 +53,9 @@
using arrow::default_memory_pool;
using arrow::io::BufferReader;
+using arrow::test::randint;
+using arrow::test::random_is_valid;
+
using ArrowId = ::arrow::Type;
using ParquetType = parquet::Type;
using parquet::schema::GroupNode;
@@ -366,6 +369,45 @@
return std::static_pointer_cast<GroupNode>(node_);
}
+void AssertArraysEqual(const Array& expected, const Array& actual) {
+ if (!actual.Equals(expected)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ EXPECT_OK(::arrow::PrettyPrint(actual, 0, &pp_result));
+ EXPECT_OK(::arrow::PrettyPrint(expected, 0, &pp_expected));
+ FAIL() << "Got: \n" << pp_result.str() << "\nExpected: \n" << pp_expected.str();
+ }
+}
+
+void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) {
+ ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
+ if (!actual.Equals(expected)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ for (int i = 0; i < actual.num_chunks(); ++i) {
+ auto c1 = actual.chunk(i);
+ auto c2 = expected.chunk(i);
+ if (!c1->Equals(*c2)) {
+ EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
+ EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
+ FAIL() << "Chunk " << i << " Got: " << pp_result.str()
+ << "\nExpected: " << pp_expected.str();
+ }
+ }
+ }
+}
+
+void AssertTablesEqual(const Table& expected, const Table& actual) {
+ ASSERT_EQ(expected.num_columns(), actual.num_columns());
+
+ for (int i = 0; i < actual.num_columns(); ++i) {
+ AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+ }
+ ASSERT_TRUE(actual.Equals(expected));
+}
+
template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
@@ -394,13 +436,14 @@
ASSERT_NE(nullptr, out->get());
}
- void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
- std::shared_ptr<::arrow::Array> out;
+ void ReadAndCheckSingleColumnFile(const Array& values) {
+ std::shared_ptr<Array> out;
std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
ReadSingleColumnFile(std::move(reader), &out);
- ASSERT_TRUE(values->Equals(out));
+
+ AssertArraysEqual(values, *out);
}
void ReadTableFromFile(std::unique_ptr<FileReader> reader,
@@ -440,7 +483,7 @@
*out = MakeSimpleTable(parent_lists, nullable_parent_lists);
}
- void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
std::shared_ptr<::arrow::Table> out;
std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
@@ -452,13 +495,14 @@
ASSERT_EQ(1, chunked_array->num_chunks());
auto result = chunked_array->chunk(0);
- ASSERT_TRUE(values->Equals(result));
+ AssertArraysEqual(*values, *result);
}
void CheckRoundTrip(const std::shared_ptr<Table>& table) {
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
- ASSERT_TRUE(table->Equals(*result));
+
+ AssertTablesEqual(*table, *result);
}
template <typename ArrayType>
@@ -495,7 +539,7 @@
MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
this->WriteColumn(schema, values);
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
@@ -515,7 +559,8 @@
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
@@ -528,7 +573,7 @@
MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, values);
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {
@@ -547,7 +592,7 @@
MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, dict_values);
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
@@ -558,12 +603,12 @@
std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
// Slice offset 1 higher
sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
@@ -574,12 +619,12 @@
std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
// Slice offset 1 higher, thus different null bitmap.
sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
@@ -636,7 +681,7 @@
}
ASSERT_OK_NO_THROW(writer.Close());
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
@@ -679,7 +724,8 @@
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
@@ -698,7 +744,7 @@
}
ASSERT_OK_NO_THROW(writer.Close());
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
@@ -763,7 +809,7 @@
ASSERT_OK(builder.Append(val));
std::shared_ptr<Array> values;
ASSERT_OK(builder.Finish(&values));
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
@@ -850,7 +896,8 @@
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
@@ -871,7 +918,8 @@
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
template <typename T>
@@ -1026,14 +1074,14 @@
table, 1, table->num_rows(), {}, &result,
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build());
- ASSERT_TRUE(table->Equals(*result));
+ AssertTablesEqual(*table, *result);
// Cast nanaoseconds to microseconds and use INT64 physical type
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
std::shared_ptr<Table> expected;
MakeDateTimeTypesTable(&table, true);
- ASSERT_TRUE(table->Equals(*result));
+ AssertTablesEqual(*table, *result);
}
TEST(TestArrowReadWrite, CoerceTimestamps) {
@@ -1097,13 +1145,14 @@
DoSimpleRoundtrip(
input, 1, input->num_rows(), {}, &milli_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build());
- ASSERT_TRUE(milli_result->Equals(*ex_milli_result));
+
+ AssertTablesEqual(*ex_milli_result, *milli_result);
std::shared_ptr<Table> micro_result;
DoSimpleRoundtrip(
input, 1, input->num_rows(), {}, µ_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
- ASSERT_TRUE(micro_result->Equals(*ex_micro_result));
+ AssertTablesEqual(*ex_micro_result, *micro_result);
}
TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
@@ -1213,7 +1262,7 @@
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
- ASSERT_TRUE(result->Equals(*ex_table));
+ AssertTablesEqual(*ex_table, *result);
}
void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
@@ -1253,7 +1302,7 @@
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);
- ASSERT_TRUE(table->Equals(*result));
+ AssertTablesEqual(*table, *result);
}
TEST(TestArrowReadWrite, ReadSingleRowGroup) {
@@ -1328,7 +1377,92 @@
auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields);
Table expected(ex_schema, ex_columns);
- ASSERT_TRUE(result->Equals(expected));
+ AssertTablesEqual(expected, *result);
+}
+
+void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
+ ::arrow::Int32Builder offset_builder;
+
+ std::vector<int32_t> length_draws;
+ randint(num_rows, 0, 100, &length_draws);
+
+ std::vector<int32_t> offset_values;
+
+ // Make sure some of them are length 0
+ int32_t total_elements = 0;
+ for (size_t i = 0; i < length_draws.size(); ++i) {
+ if (length_draws[i] < 10) {
+ length_draws[i] = 0;
+ }
+ offset_values.push_back(total_elements);
+ total_elements += length_draws[i];
+ }
+ offset_values.push_back(total_elements);
+
+ std::vector<int8_t> value_draws;
+ randint<int8_t>(total_elements, 0, 100, &value_draws);
+
+ std::vector<bool> is_valid;
+ random_is_valid(total_elements, 0.1, &is_valid);
+
+ std::shared_ptr<Array> values, offsets;
+ ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
+ value_draws, &values);
+ ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
+
+ std::shared_ptr<Array> list_array;
+ ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
+ &list_array));
+
+ auto f1 = ::arrow::field("a", ::arrow::list(::arrow::int8()));
+ auto schema = ::arrow::schema({f1});
+ std::vector<std::shared_ptr<Array>> arrays = {list_array};
+ *out = std::make_shared<Table>(schema, arrays);
+}
+
+TEST(TestArrowReadWrite, ListLargeRecords) {
+ const int num_rows = 50;
+
+ std::shared_ptr<Table> table;
+ MakeListTable(num_rows, &table);
+
+ std::shared_ptr<Buffer> buffer;
+ WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer);
+
+ std::unique_ptr<FileReader> reader;
+ ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+ ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ // Read everything
+ std::shared_ptr<Table> result;
+ ASSERT_OK_NO_THROW(reader->ReadTable(&result));
+ AssertTablesEqual(*table, *result);
+
+ ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+ ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ std::unique_ptr<ColumnReader> col_reader;
+ ASSERT_OK(reader->GetColumn(0, &col_reader));
+
+ auto expected = table->column(0)->data()->chunk(0);
+
+ std::vector<std::shared_ptr<Array>> pieces;
+ for (int i = 0; i < num_rows; ++i) {
+ std::shared_ptr<Array> piece;
+ ASSERT_OK(col_reader->NextBatch(1, &piece));
+ ASSERT_EQ(1, piece->length());
+ pieces.push_back(piece);
+ }
+ auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces);
+
+ auto chunked_col =
+ std::make_shared<::arrow::Column>(table->schema()->field(0), chunked);
+ std::vector<std::shared_ptr<::arrow::Column>> columns = {chunked_col};
+ auto chunked_table = std::make_shared<Table>(table->schema(), columns);
+
+ ASSERT_TRUE(table->Equals(*chunked_table));
}
TEST(TestArrowWrite, CheckChunkSize) {
@@ -1359,6 +1493,7 @@
void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
nested_parquet_ = std::make_shared<InMemoryOutputStream>();
+
writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema,
default_writer_properties());
row_group_writer_ = writer_->AppendRowGroup(num_rows);
@@ -1397,7 +1532,6 @@
void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) {
ValidateArray(array, expected_nulls);
-
int j = 0;
for (int i = 0; i < values_array_->length(); i++) {
if (array.IsNull(i)) {
@@ -1515,15 +1649,19 @@
int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth)));
- std::vector<int16_t> def_levels(num_rows);
- std::vector<int16_t> rep_levels(num_rows);
- for (int i = 0; i < num_rows; i++) {
+ std::vector<int16_t> def_levels;
+ std::vector<int16_t> rep_levels;
+
+ int num_levels = 0;
+ while (num_levels < num_rows) {
if (node_repetition == Repetition::REQUIRED) {
- def_levels[i] = 0; // all is required
+ def_levels.push_back(0); // all are required
} else {
- def_levels[i] = i % tree_depth; // all is optional
+ int16_t level = static_cast<int16_t>(num_levels % (tree_depth + 2));
+ def_levels.push_back(level); // all are optional
}
- rep_levels[i] = 0; // none is repeated
+ rep_levels.push_back(0); // none is repeated
+ ++num_levels;
}
// Produce values for the columns
@@ -1675,7 +1813,7 @@
const int num_trees = 10;
const int depth = 5;
const int num_children = 3;
- int num_rows = SMALL_SIZE * depth;
+ int num_rows = SMALL_SIZE * (depth + 2);
CreateMultiLevelNestedParquet(num_trees, depth, num_children, num_rows, GetParam());
std::shared_ptr<Table> table;
ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 8d5ea7e..5edc837 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -24,13 +24,18 @@
#include <queue>
#include <string>
#include <thread>
+#include <type_traits>
#include <vector>
#include "arrow/api.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/logging.h"
+#include "arrow/util/parallel.h"
+#include "parquet/arrow/record_reader.h"
#include "parquet/arrow/schema.h"
+#include "parquet/column_reader.h"
+#include "parquet/schema.h"
#include "parquet/util/schema-util.h"
using arrow::Array;
@@ -40,21 +45,28 @@
using arrow::Int32Array;
using arrow::ListArray;
using arrow::StructArray;
+using arrow::TimestampArray;
using arrow::MemoryPool;
using arrow::PoolBuffer;
using arrow::Status;
using arrow::Table;
-using parquet::schema::NodePtr;
+using parquet::schema::Node;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;
+using arrow::ParallelFor;
+
+using parquet::internal::RecordReader;
namespace parquet {
namespace arrow {
+using ::arrow::BitUtil::BytesForBits;
+
constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
-constexpr int64_t kNanosecondsInADay = 86400LL * 1000LL * 1000LL * 1000LL;
+constexpr int64_t kMillisecondsInADay = 86400000LL;
+constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;
static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
@@ -66,47 +78,6 @@
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
// ----------------------------------------------------------------------
-// Helper for parallel for-loop
-
-template <class FUNCTION>
-Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
- std::vector<std::thread> thread_pool;
- thread_pool.reserve(nthreads);
- std::atomic<int> task_counter(0);
-
- std::mutex error_mtx;
- bool error_occurred = false;
- Status error;
-
- for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
- thread_pool.emplace_back(
- [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
- int task_id;
- while (!error_occurred) {
- task_id = task_counter.fetch_add(1);
- if (task_id >= num_tasks) {
- break;
- }
- Status s = func(task_id);
- if (!s.ok()) {
- std::lock_guard<std::mutex> lock(error_mtx);
- error_occurred = true;
- error = s;
- break;
- }
- }
- });
- }
- for (auto&& thread : thread_pool) {
- thread.join();
- }
- if (error_occurred) {
- return error;
- }
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
// Iteration utilities
// Abstraction to decouple row group iteration details from the ColumnReader,
@@ -120,7 +91,7 @@
virtual ~FileColumnIterator() {}
- virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0;
+ virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0;
const SchemaDescriptor* schema() const { return schema_; }
@@ -141,10 +112,10 @@
explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
: FileColumnIterator(column_index, reader), next_row_group_(0) {}
- std::shared_ptr<::parquet::ColumnReader> Next() override {
- std::shared_ptr<::parquet::ColumnReader> result;
+ std::unique_ptr<::parquet::PageReader> NextChunk() override {
+ std::unique_ptr<::parquet::PageReader> result;
if (next_row_group_ < reader_->metadata()->num_row_groups()) {
- result = reader_->RowGroup(next_row_group_)->Column(column_index_);
+ result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_);
next_row_group_++;
} else {
result = nullptr;
@@ -164,12 +135,13 @@
row_group_number_(row_group_number),
done_(false) {}
- std::shared_ptr<::parquet::ColumnReader> Next() override {
+ std::unique_ptr<::parquet::PageReader> NextChunk() override {
if (done_) {
return nullptr;
}
- auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
+ auto result =
+ reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_);
done_ = true;
return result;
};
@@ -193,8 +165,9 @@
Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
Status ReadSchemaField(int i, const std::vector<int>& indices,
std::shared_ptr<Array>* out);
- Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
- int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
+ Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
+ int16_t def_level,
+ std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
Status ReadColumn(int i, std::shared_ptr<Array>* out);
Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
Status GetSchema(const std::vector<int>& indices,
@@ -223,96 +196,54 @@
int num_threads_;
};
-typedef const int16_t* ValueLevelsPtr;
-
-class ColumnReader::Impl {
+class ColumnReader::ColumnReaderImpl {
public:
- virtual ~Impl() {}
- virtual Status NextBatch(int batch_size, std::shared_ptr<Array>* out) = 0;
- virtual Status GetDefLevels(ValueLevelsPtr* data, size_t* length) = 0;
- virtual Status GetRepLevels(ValueLevelsPtr* data, size_t* length) = 0;
+ virtual ~ColumnReaderImpl() {}
+ virtual Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) = 0;
+ virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
+ virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
virtual const std::shared_ptr<Field> field() = 0;
};
// Reader implementation for primitive arrays
-class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
public:
PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
- : pool_(pool),
- input_(std::move(input)),
- descr_(input_->descr()),
- values_buffer_(pool),
- def_levels_buffer_(pool),
- rep_levels_buffer_(pool) {
- DCHECK(NodeToField(input_->descr()->schema_node(), &field_).ok());
+ : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
+ record_reader_ = RecordReader::Make(descr_, pool_);
+ DCHECK(NodeToField(*input_->descr()->schema_node(), &field_).ok());
NextRowGroup();
}
virtual ~PrimitiveImpl() {}
- Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
+ Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
- template <typename ArrowType, typename ParquetType>
- Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+ template <typename ParquetType>
+ Status WrapIntoListArray(std::shared_ptr<Array>* array);
- template <typename ArrowType>
- Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
-
- template <typename ArrowType>
- Status ReadFLBABatch(int batch_size, int byte_width, std::shared_ptr<Array>* out);
-
- template <typename ArrowType>
- Status InitDataBuffer(int batch_size);
- Status InitValidBits(int batch_size);
- template <typename ArrowType, typename ParquetType>
- Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
- int16_t* rep_levels, int64_t values_to_read,
- int64_t* levels_read, int64_t* values_read);
- template <typename ArrowType, typename ParquetType>
- Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
- int64_t values_to_read, int64_t* levels_read);
- Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
- int64_t total_values_read, std::shared_ptr<Array>* array);
-
- Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
- Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+ Status GetDefLevels(const int16_t** data, size_t* length) override;
+ Status GetRepLevels(const int16_t** data, size_t* length) override;
const std::shared_ptr<Field> field() override { return field_; }
private:
void NextRowGroup();
- template <typename InType, typename OutType>
- struct can_copy_ptr {
- static constexpr bool value =
- std::is_same<InType, OutType>::value ||
- (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
- (sizeof(InType) == sizeof(OutType)));
- };
-
MemoryPool* pool_;
std::unique_ptr<FileColumnIterator> input_;
const ColumnDescriptor* descr_;
- std::shared_ptr<::parquet::ColumnReader> column_reader_;
- std::shared_ptr<Field> field_;
+ std::shared_ptr<RecordReader> record_reader_;
- PoolBuffer values_buffer_;
- PoolBuffer def_levels_buffer_;
- PoolBuffer rep_levels_buffer_;
- std::shared_ptr<PoolBuffer> data_buffer_;
- uint8_t* data_buffer_ptr_;
- std::shared_ptr<PoolBuffer> valid_bits_buffer_;
- uint8_t* valid_bits_ptr_;
- int64_t valid_bits_idx_;
- int64_t null_count_;
+ std::shared_ptr<Field> field_;
};
// Reader implementation for struct array
-class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
public:
- explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
- int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
+ explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
+ int16_t struct_def_level, MemoryPool* pool, const Node* node)
: children_(children),
struct_def_level_(struct_def_level),
pool_(pool),
@@ -322,21 +253,21 @@
virtual ~StructImpl() {}
- Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
- Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
- Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+ Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
+ Status GetDefLevels(const int16_t** data, size_t* length) override;
+ Status GetRepLevels(const int16_t** data, size_t* length) override;
const std::shared_ptr<Field> field() override { return field_; }
private:
- std::vector<std::shared_ptr<Impl>> children_;
+ std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
int16_t struct_def_level_;
MemoryPool* pool_;
std::shared_ptr<Field> field_;
PoolBuffer def_levels_buffer_;
- Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap,
- int64_t* null_count);
- void InitField(const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children);
+ Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
+ void InitField(const Node* node,
+ const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
};
FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -347,26 +278,26 @@
Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
- std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+ new PrimitiveImpl(pool_, std::move(input)));
*out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
return Status::OK();
}
-Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
- const std::vector<int>& indices,
- int16_t def_level,
- std::unique_ptr<ColumnReader::Impl>* out) {
+Status FileReader::Impl::GetReaderForNode(
+ int index, const Node* node, const std::vector<int>& indices, int16_t def_level,
+ std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
*out = nullptr;
if (IsSimpleStruct(node)) {
- const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node.get());
- std::vector<std::shared_ptr<ColumnReader::Impl>> children;
+ const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
+ std::vector<std::shared_ptr<ColumnReader::ColumnReaderImpl>> children;
for (int i = 0; i < group->field_count(); i++) {
- std::unique_ptr<ColumnReader::Impl> child_reader;
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> child_reader;
// TODO(itaiin): Remove the -1 index hack when all types of nested reads
// are supported. This currently just signals the lower level reader resolution
// to abort
- RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices, def_level + 1,
+ RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices, def_level + 1,
&child_reader));
if (child_reader != nullptr) {
children.push_back(std::move(child_reader));
@@ -374,22 +305,22 @@
}
if (children.size() > 0) {
- *out = std::unique_ptr<ColumnReader::Impl>(
+ *out = std::unique_ptr<ColumnReader::ColumnReaderImpl>(
new StructImpl(children, def_level, pool_, node));
}
} else {
// This should be a flat field case - translate the field index to
// the correct column index by walking down to the leaf node
- NodePtr walker = node;
+ const Node* walker = node;
while (!walker->is_primitive()) {
DCHECK(walker->is_group());
- auto group = static_cast<GroupNode*>(walker.get());
+ auto group = static_cast<const GroupNode*>(walker);
if (group->field_count() != 1) {
return Status::NotImplemented("lists with structs are not supported.");
}
- walker = group->field(0);
+ walker = group->field(0).get();
}
- auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker.get());
+ auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
// If the index of the column is found then a reader for the coliumn is needed.
// Otherwise *out keeps the nullptr value.
@@ -417,8 +348,8 @@
std::shared_ptr<Array>* out) {
auto parquet_schema = reader_->metadata()->schema();
- auto node = parquet_schema->group_node()->field(i);
- std::unique_ptr<ColumnReader::Impl> reader_impl;
+ auto node = parquet_schema->group_node()->field(i).get();
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
if (reader_impl == nullptr) {
@@ -428,24 +359,28 @@
std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
- int64_t batch_size = 0;
- for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
- batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+ // TODO(wesm): This calculation doesn't make much sense when we have repeated
+ // schema nodes
+ int64_t records_to_read = 0;
+
+ const FileMetaData& metadata = *reader_->metadata();
+ for (int j = 0; j < metadata.num_row_groups(); j++) {
+ records_to_read += metadata.RowGroup(j)->ColumnChunk(i)->num_values();
}
- return reader->NextBatch(static_cast<int>(batch_size), out);
+ return reader->NextBatch(records_to_read, out);
}
Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
- int64_t batch_size = 0;
+ int64_t records_to_read = 0;
for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
- batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+ records_to_read += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
}
- return flat_column_reader->NextBatch(static_cast<int>(batch_size), out);
+ return flat_column_reader->NextBatch(records_to_read, out);
}
Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
@@ -472,16 +407,17 @@
auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
this](int i) {
int column_index = indices[i];
- int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
+ int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
std::unique_ptr<FileColumnIterator> input(
new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
- std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+ new PrimitiveImpl(pool_, std::move(input)));
ColumnReader flat_column_reader(std::move(impl));
std::shared_ptr<Array> array;
- RETURN_NOT_OK(flat_column_reader.NextBatch(static_cast<int>(batch_size), &array));
+ RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
columns[i] = std::make_shared<Column>(schema->field(i), array);
return Status::OK();
};
@@ -642,282 +578,12 @@
return impl_->parquet_reader();
}
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
- int64_t values_to_read, int64_t* levels_read) {
- using ArrowCType = typename ArrowType::c_type;
- using ParquetCType = typename ParquetType::c_type;
+template <typename ParquetType>
+Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
+ const int16_t* def_levels = record_reader_->def_levels();
+ const int16_t* rep_levels = record_reader_->rep_levels();
+ const int64_t total_levels_read = record_reader_->levels_position();
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
- auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
- std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
- valid_bits_idx_ += values_read;
-
- return Status::OK();
-}
-
-#define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \
- template <> \
- Status PrimitiveImpl::ReadNonNullableBatch<ArrowType, ParquetType>( \
- TypedColumnReader<ParquetType> * reader, int64_t values_to_read, \
- int64_t * levels_read) { \
- int64_t values_read; \
- CType* out_ptr = reinterpret_cast<CType*>(data_buffer_ptr_); \
- PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch( \
- static_cast<int>(values_to_read), nullptr, nullptr, \
- out_ptr + valid_bits_idx_, &values_read)); \
- \
- valid_bits_idx_ += values_read; \
- \
- return Status::OK(); \
- }
-
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
- TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
- auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
- for (int64_t i = 0; i < values_read; i++) {
- *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
- }
- valid_bits_idx_ += values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
- TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
- auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
- for (int64_t i = 0; i < values_read; i++) {
- *out_ptr++ = static_cast<int64_t>(values[i]) * 86400000;
- }
- valid_bits_idx_ += values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
- TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
- int64_t* levels_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
- auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- for (int64_t i = 0; i < values_read; i++) {
- if (values[i]) {
- ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
- }
- valid_bits_idx_++;
- }
-
- return Status::OK();
-}
-
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
- int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read,
- int64_t* values_read) {
- using ArrowCType = typename ArrowType::c_type;
- using ParquetCType = typename ParquetType::c_type;
-
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
- auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- data_ptr[valid_bits_idx_ + i] = values[i];
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- null_count_ += null_count;
- valid_bits_idx_ += *values_read;
-
- return Status::OK();
-}
-
-#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \
- template <> \
- Status PrimitiveImpl::ReadNullableBatch<ArrowType, ParquetType>( \
- TypedColumnReader<ParquetType> * reader, int16_t * def_levels, \
- int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read, \
- int64_t * values_read) { \
- auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_); \
- int64_t null_count; \
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced( \
- static_cast<int>(values_to_read), def_levels, rep_levels, \
- data_ptr + valid_bits_idx_, valid_bits_ptr_, valid_bits_idx_, levels_read, \
- values_read, &null_count)); \
- \
- valid_bits_idx_ += *values_read; \
- null_count_ += null_count; \
- \
- return Status::OK(); \
- }
-
-NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
- TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
- auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- data_ptr[valid_bits_idx_ + i] = impala_timestamp_to_nanoseconds(values[i]);
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- null_count_ += null_count;
- valid_bits_idx_ += *values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
- TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
- auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000;
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- null_count_ += null_count;
- valid_bits_idx_ += *values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
- TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
- auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- if (values[i]) {
- ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i);
- }
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- valid_bits_idx_ += *values_read;
- null_count_ += null_count;
-
- return Status::OK();
-}
-
-template <typename ArrowType>
-Status PrimitiveImpl::InitDataBuffer(int batch_size) {
- using ArrowCType = typename ArrowType::c_type;
- data_buffer_ = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false));
- data_buffer_ptr_ = data_buffer_->mutable_data();
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
- data_buffer_ = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false));
- data_buffer_ptr_ = data_buffer_->mutable_data();
- memset(data_buffer_ptr_, 0, data_buffer_->size());
-
- return Status::OK();
-}
-
-Status PrimitiveImpl::InitValidBits(int batch_size) {
- valid_bits_idx_ = 0;
- if (descr_->max_definition_level() > 0) {
- int valid_bits_size =
- static_cast<int>(::arrow::BitUtil::CeilByte(batch_size + 1)) / 8;
- valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false));
- valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
- memset(valid_bits_ptr_, 0, valid_bits_size);
- null_count_ = 0;
- }
- return Status::OK();
-}
-
-Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
- const int16_t* rep_levels,
- int64_t total_levels_read,
- std::shared_ptr<Array>* array) {
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
input_->metadata()->key_value_metadata(),
@@ -1021,8 +687,8 @@
std::shared_ptr<Array> output(*array);
for (int64_t j = list_depth - 1; j >= 0; j--) {
- auto list_type = std::make_shared<::arrow::ListType>(
- std::make_shared<Field>("item", output->type(), nullable[j + 1]));
+ auto list_type =
+ ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
output = std::make_shared<::arrow::ListArray>(
list_type, list_lengths[j], offsets[j], output, valid_bits[j], null_counts[j]);
}
@@ -1032,346 +698,291 @@
}
template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
+struct supports_fast_path_impl {
using ArrowCType = typename ArrowType::c_type;
-
- int values_to_read = batch_size;
- int total_levels_read = 0;
- RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
- RETURN_NOT_OK(InitValidBits(batch_size));
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
- while ((values_to_read > 0) && column_reader_) {
- auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- if (descr_->max_definition_level() == 0) {
- RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(reader, values_to_read,
- &values_read)));
- } else {
- // As per the defintion and checks for flat (list) columns:
- // descr_->max_definition_level() > 0, <= 3
- RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(
- reader, def_levels + total_levels_read, rep_levels + total_levels_read,
- values_to_read, &levels_read, &values_read)));
- total_levels_read += static_cast<int>(levels_read);
- }
- values_to_read -= static_cast<int>(values_read);
- if (!column_reader_->HasNext()) {
- NextRowGroup();
- }
- }
-
- // Shrink arrays as they may be larger than the output.
- RETURN_NOT_OK(data_buffer_->Resize(valid_bits_idx_ * sizeof(ArrowCType)));
- if (descr_->max_definition_level() > 0) {
- if (valid_bits_idx_ < batch_size * 0.8) {
- RETURN_NOT_OK(valid_bits_buffer_->Resize(
- ::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8, false));
- }
- *out = std::make_shared<ArrayType<ArrowType>>(
- field_->type(), valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
- // Relase the ownership as the Buffer is now part of a new Array
- valid_bits_buffer_.reset();
- } else {
- *out = std::make_shared<ArrayType<ArrowType>>(field_->type(), valid_bits_idx_,
- data_buffer_);
- }
- // Relase the ownership as the Buffer is now part of a new Array
- data_buffer_.reset();
-
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
-
-template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
- int batch_size, std::shared_ptr<Array>* out) {
- int values_to_read = batch_size;
- int total_levels_read = 0;
- RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
- RETURN_NOT_OK(InitValidBits(batch_size));
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
- while ((values_to_read > 0) && column_reader_) {
- auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- if (descr_->max_definition_level() == 0) {
- RETURN_NOT_OK((ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
- reader, values_to_read, &values_read)));
- } else {
- // As per the defintion and checks for flat columns:
- // descr_->max_definition_level() == 1
- RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(
- reader, def_levels + total_levels_read, rep_levels + total_levels_read,
- values_to_read, &levels_read, &values_read)));
- total_levels_read += static_cast<int>(levels_read);
- }
- values_to_read -= static_cast<int>(values_read);
- if (!column_reader_->HasNext()) {
- NextRowGroup();
- }
- }
-
- if (descr_->max_definition_level() > 0) {
- // TODO: Shrink arrays in the case they are too large
- if (valid_bits_idx_ < batch_size * 0.8) {
- // Shrink arrays as they are larger than the output.
- // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
- // without the need for a copy. Given a decent underlying allocator this
- // should still free some underlying pages to the OS.
-
- auto data_buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
- memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
- data_buffer_ = data_buffer;
-
- auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(
- valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
- memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
- valid_bits_buffer->size());
- valid_bits_buffer_ = valid_bits_buffer;
- }
- *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_,
- valid_bits_buffer_, null_count_);
- // Relase the ownership
- data_buffer_.reset();
- valid_bits_buffer_.reset();
- } else {
- *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_);
- data_buffer_.reset();
- }
-
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+ using ParquetCType = typename ParquetType::c_type;
+ static constexpr bool value = std::is_same<ArrowCType, ParquetCType>::value;
+};
template <typename ArrowType>
-Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out) {
- using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
-
- int total_levels_read = 0;
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
- int values_to_read = batch_size;
- BuilderType builder(pool_);
- while ((values_to_read > 0) && column_reader_) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ByteArray), false));
- auto reader = dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
- PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
- values_to_read, def_levels + total_levels_read,
- rep_levels + total_levels_read, values, &values_read));
- values_to_read -= static_cast<int>(levels_read);
- if (descr_->max_definition_level() == 0) {
- for (int64_t i = 0; i < levels_read; i++) {
- RETURN_NOT_OK(
- builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
- }
- } else {
- // descr_->max_definition_level() > 0
- int values_idx = 0;
- int nullable_elements = descr_->schema_node()->is_optional();
- for (int64_t i = 0; i < levels_read; i++) {
- if (nullable_elements &&
- (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
- RETURN_NOT_OK(builder.AppendNull());
- } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
- RETURN_NOT_OK(
- builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
- values[values_idx].len));
- values_idx++;
- }
- }
- total_levels_read += static_cast<int>(levels_read);
- }
- if (!column_reader_->HasNext()) {
- NextRowGroup();
- }
- }
-
- RETURN_NOT_OK(builder.Finish(out));
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+struct supports_fast_path_impl<ArrowType, ByteArrayType> {
+ static constexpr bool value = false;
+};
template <typename ArrowType>
-Status PrimitiveImpl::ReadFLBABatch(int batch_size, int byte_width,
- std::shared_ptr<Array>* out) {
- using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
- int total_levels_read = 0;
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
+struct supports_fast_path_impl<ArrowType, FLBAType> {
+ static constexpr bool value = false;
+};
- int values_to_read = batch_size;
- BuilderType builder(::arrow::fixed_size_binary(byte_width), pool_);
- while ((values_to_read > 0) && column_reader_) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(FLBA), false));
- auto reader = dynamic_cast<TypedColumnReader<FLBAType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data());
- PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
- values_to_read, def_levels + total_levels_read,
- rep_levels + total_levels_read, values, &values_read));
- values_to_read -= static_cast<int>(levels_read);
- if (descr_->max_definition_level() == 0) {
- for (int64_t i = 0; i < levels_read; i++) {
- RETURN_NOT_OK(builder.Append(values[i].ptr));
- }
+template <typename ArrowType, typename ParquetType>
+using supports_fast_path =
+ typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
+
+template <typename ArrowType, typename ParquetType, typename Enable = void>
+struct TransferFunctor {
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
+
+ auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+ auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+ std::copy(values, values + length, out_ptr);
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
+ reader->null_count());
} else {
- int values_idx = 0;
- int nullable_elements = descr_->schema_node()->is_optional();
- for (int64_t i = 0; i < levels_read; i++) {
- if (nullable_elements &&
- (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
- RETURN_NOT_OK(builder.AppendNull());
- } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
- RETURN_NOT_OK(builder.Append(values[values_idx].ptr));
- values_idx++;
- }
- }
- total_levels_read += static_cast<int>(levels_read);
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, data);
}
- if (!column_reader_->HasNext()) {
- NextRowGroup();
- }
+ return Status::OK();
}
+};
- RETURN_NOT_OK(builder.Finish(out));
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<ArrowType, ParquetType,
+ supports_fast_path<ArrowType, ParquetType>> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ std::shared_ptr<PoolBuffer> values = reader->ReleaseValues();
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
+ reader->null_count());
+ } else {
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, values);
+ }
+ return Status::OK();
+ }
+};
template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
- int batch_size, std::shared_ptr<Array>* out) {
- return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
-}
+struct TransferFunctor<::arrow::BooleanType, BooleanType> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ std::shared_ptr<Buffer> data;
+
+ const int64_t buffer_size = BytesForBits(length);
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
+
+ // Transfer boolean values to packed bitmap
+ auto values = reinterpret_cast<const bool*>(reader->values());
+ uint8_t* data_ptr = data->mutable_data();
+ memset(data_ptr, 0, buffer_size);
+
+ for (int64_t i = 0; i < length; i++) {
+ if (values[i]) {
+ ::arrow::BitUtil::SetBit(data_ptr, i);
+ }
+ }
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false));
+ *out = std::make_shared<BooleanArray>(type, length, data, is_valid,
+ reader->null_count());
+ } else {
+ *out = std::make_shared<BooleanArray>(type, length, data);
+ }
+ return Status::OK();
+ }
+};
template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
- int batch_size, std::shared_ptr<Array>* out) {
- return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
-}
+struct TransferFunctor<::arrow::TimestampType, Int96Type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ auto values = reinterpret_cast<const Int96*>(reader->values());
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
- case ::arrow::Type::ENUM: \
- return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
- break;
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
-Status PrimitiveImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
- if (!column_reader_) {
+ auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+ for (int64_t i = 0; i < length; i++) {
+ *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
+ }
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<TimestampArray>(type, length, data, is_valid,
+ reader->null_count());
+ } else {
+ *out = std::make_shared<TimestampArray>(type, length, data);
+ }
+
+ return Status::OK();
+ }
+};
+
+template <>
+struct TransferFunctor<::arrow::Date64Type, Int32Type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ auto values = reinterpret_cast<const int32_t*>(reader->values());
+
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+ auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+
+ for (int64_t i = 0; i < length; i++) {
+ *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsInADay;
+ }
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
+ reader->null_count());
+ } else {
+ *out = std::make_shared<::arrow::Date64Array>(type, length, data);
+ }
+ return Status::OK();
+ }
+};
+
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<
+ ArrowType, ParquetType,
+ typename std::enable_if<std::is_same<ParquetType, ByteArrayType>::value ||
+ std::is_same<ParquetType, FLBAType>::value>::type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ RETURN_NOT_OK(reader->builder()->Finish(out));
+
+ if (type->id() == ::arrow::Type::STRING) {
+ // Convert from BINARY type to STRING
+ auto new_data = (*out)->data()->ShallowCopy();
+ new_data->type = type;
+ RETURN_NOT_OK(::arrow::MakeArray(new_data, out));
+ }
+ return Status::OK();
+ }
+};
+
+#define TRANSFER_DATA(ArrowType, ParquetType) \
+ TransferFunctor<ArrowType, ParquetType> func; \
+ RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
+ RETURN_NOT_OK(WrapIntoListArray<ParquetType>(out))
+
+#define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
+ case ::arrow::Type::ENUM: { \
+ TRANSFER_DATA(ArrowType, ParquetType); \
+ } break;
+
+Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+ if (!record_reader_->HasMoreData()) {
// Exhausted all row groups.
*out = nullptr;
return Status::OK();
}
- switch (field_->type()->id()) {
- case ::arrow::Type::NA:
- *out = std::make_shared<::arrow::NullArray>(batch_size);
- return Status::OK();
- break;
- TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
- TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
- TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
- TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
- TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
- TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
- TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
- TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
- TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
- TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
- TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
- TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
- TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
- TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
- TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
- case ::arrow::Type::FIXED_SIZE_BINARY: {
- int32_t byte_width =
- static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width();
- return ReadFLBABatch<::arrow::FixedSizeBinaryType>(batch_size, byte_width, out);
- break;
+ if (field_->type()->id() == ::arrow::Type::NA) {
+ *out = std::make_shared<::arrow::NullArray>(records_to_read);
+ return Status::OK();
+ }
+
+ try {
+ // Pre-allocation gives much better performance for flat columns
+ record_reader_->Reserve(records_to_read);
+
+ record_reader_->Reset();
+ while (records_to_read > 0) {
+ if (!record_reader_->HasMoreData()) {
+ break;
+ }
+ int64_t records_read = record_reader_->ReadRecords(records_to_read);
+ records_to_read -= records_read;
+ if (records_read == 0) {
+ NextRowGroup();
+ }
}
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+
+ switch (field_->type()->id()) {
+ TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+ TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+ TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
+ TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+ TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
+ TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+ TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
+ TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+ TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
+ TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
+ TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+ TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
+ TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
+ TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
+ TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
+ TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type().get());
switch (timestamp_type->unit()) {
case ::arrow::TimeUnit::MILLI:
- return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
- break;
- case ::arrow::TimeUnit::MICRO:
- return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
- break;
- case ::arrow::TimeUnit::NANO:
- return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
- break;
+ case ::arrow::TimeUnit::MICRO: {
+ TRANSFER_DATA(::arrow::TimestampType, Int64Type);
+ } break;
+ case ::arrow::TimeUnit::NANO: {
+ TRANSFER_DATA(::arrow::TimestampType, Int96Type);
+ } break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
break;
}
- TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type)
- TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type)
+ TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
+ TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
default:
std::stringstream ss;
ss << "No support for reading columns of type " << field_->type()->ToString();
return Status::NotImplemented(ss.str());
}
-}
-void PrimitiveImpl::NextRowGroup() { column_reader_ = input_->Next(); }
-
-Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
- *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
- *length = def_levels_buffer_.size() / sizeof(int16_t);
return Status::OK();
}
-Status PrimitiveImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
- *data = reinterpret_cast<ValueLevelsPtr>(rep_levels_buffer_.data());
- *length = rep_levels_buffer_.size() / sizeof(int16_t);
+void PrimitiveImpl::NextRowGroup() {
+ std::unique_ptr<PageReader> page_reader = input_->NextChunk();
+ record_reader_->SetPageReader(std::move(page_reader));
+}
+
+Status PrimitiveImpl::GetDefLevels(const int16_t** data, size_t* length) {
+ *data = record_reader_->def_levels();
+ *length = record_reader_->levels_written();
return Status::OK();
}
-ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+Status PrimitiveImpl::GetRepLevels(const int16_t** data, size_t* length) {
+ *data = record_reader_->rep_levels();
+ *length = record_reader_->levels_written();
+ return Status::OK();
+}
+
+ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl)
+ : impl_(std::move(impl)) {}
ColumnReader::~ColumnReader() {}
-Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
- return impl_->NextBatch(batch_size, out);
+Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+ return impl_->NextBatch(records_to_read, out);
}
// StructImpl methods
@@ -1380,7 +991,7 @@
int64_t* null_count_out) {
std::shared_ptr<Buffer> null_bitmap;
auto null_count = 0;
- ValueLevelsPtr def_levels_data;
+ const int16_t* def_levels_data;
size_t def_levels_length;
RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
@@ -1402,7 +1013,7 @@
// TODO(itaiin): Consider caching the results of this calculation -
// note that this is only used once for each read for now
-Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
*data = nullptr;
if (children_.size() == 0) {
// Empty struct
@@ -1411,7 +1022,7 @@
}
// We have at least one child
- ValueLevelsPtr child_def_levels;
+ const int16_t* child_def_levels;
size_t child_length;
RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
auto size = child_length * sizeof(int16_t);
@@ -1438,27 +1049,27 @@
std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
}
}
- *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+ *data = reinterpret_cast<const int16_t*>(def_levels_buffer_.data());
*length = child_length;
return Status::OK();
}
-void StructImpl::InitField(const NodePtr& node,
- const std::vector<std::shared_ptr<Impl>>& children) {
+void StructImpl::InitField(
+ const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) {
// Make a shallow node to field conversion from the children fields
std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
for (size_t i = 0; i < children.size(); i++) {
fields[i] = children[i]->field();
}
- auto type = std::make_shared<::arrow::StructType>(fields);
- field_ = std::make_shared<Field>(node->name(), type);
+ auto type = ::arrow::struct_(fields);
+ field_ = ::arrow::field(node->name(), type);
}
-Status StructImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
return Status::NotImplemented("GetRepLevels is not implemented for struct");
}
-Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status StructImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
std::vector<std::shared_ptr<Array>> children_arrays;
std::shared_ptr<Buffer> null_bitmap;
int64_t null_count;
@@ -1467,16 +1078,23 @@
for (auto& child : children_) {
std::shared_ptr<Array> child_array;
- RETURN_NOT_OK(child->NextBatch(batch_size, &child_array));
-
+ RETURN_NOT_OK(child->NextBatch(records_to_read, &child_array));
children_arrays.push_back(child_array);
}
RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
- *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
- null_bitmap, null_count);
+ int64_t struct_length = children_arrays[0]->length();
+ for (size_t i = 1; i < children_arrays.size(); ++i) {
+ if (children_arrays[i]->length() != struct_length) {
+ // TODO(wesm): This should really only occur if the Parquet file is
+ // malformed. Should this be a DCHECK?
+ return Status::Invalid("Struct children had different lengths");
+ }
+ }
+ *out = std::make_shared<StructArray>(field()->type(), struct_length, children_arrays,
+ null_bitmap, null_count);
return Status::OK();
}
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index ce82375..faaef9a 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -173,7 +173,7 @@
// might change in the future.
class PARQUET_EXPORT ColumnReader {
public:
- class PARQUET_NO_EXPORT Impl;
+ class PARQUET_NO_EXPORT ColumnReaderImpl;
virtual ~ColumnReader();
// Scan the next array of the indicated size. The actual size of the
@@ -185,11 +185,11 @@
//
// Returns Status::OK on a successful read, including if you have exhausted
// the data available in the file.
- ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out);
+ ::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::Array>* out);
private:
- std::unique_ptr<Impl> impl_;
- explicit ColumnReader(std::unique_ptr<Impl> impl);
+ std::unique_ptr<ColumnReaderImpl> impl_;
+ explicit ColumnReader(std::unique_ptr<ColumnReaderImpl> impl);
friend class FileReader;
friend class PrimitiveImpl;
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
new file mode 100644
index 0000000..7275d2f
--- /dev/null
+++ b/src/parquet/arrow/record_reader.cc
@@ -0,0 +1,807 @@
+// licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/record_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <sstream>
+
+#include <arrow/buffer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/status.h>
+#include <arrow/util/bit-util.h>
+#include <arrow/util/rle-encoding.h>
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/encoding-internal.h"
+#include "parquet/exception.h"
+#include "parquet/properties.h"
+
+using arrow::MemoryPool;
+
+namespace parquet {
+namespace internal {
+
+namespace BitUtil = ::arrow::BitUtil;
+
+template <typename DType>
+class TypedRecordReader;
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+ return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+class RecordReader::RecordReaderImpl {
+ public:
+ RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool)
+ : descr_(descr),
+ pool_(pool),
+ num_buffered_values_(0),
+ num_decoded_values_(0),
+ max_def_level_(descr->max_definition_level()),
+ max_rep_level_(descr->max_repetition_level()),
+ at_record_start_(false),
+ records_read_(0),
+ values_written_(0),
+ values_capacity_(0),
+ null_count_(0),
+ levels_written_(0),
+ levels_position_(0),
+ levels_capacity_(0) {
+ nullable_values_ = internal::HasSpacedValues(descr);
+ values_ = std::make_shared<PoolBuffer>(pool);
+ valid_bits_ = std::make_shared<PoolBuffer>(pool);
+ def_levels_ = std::make_shared<PoolBuffer>(pool);
+ rep_levels_ = std::make_shared<PoolBuffer>(pool);
+
+ if (descr->physical_type() == Type::BYTE_ARRAY) {
+ builder_.reset(new ::arrow::BinaryBuilder(pool));
+ } else if (descr->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+ int byte_width = descr->type_length();
+ std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
+ builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool));
+ }
+ Reset();
+ }
+
+ virtual ~RecordReaderImpl() {}
+
+ virtual int64_t ReadRecords(int64_t num_records) = 0;
+
+ // Dictionary decoders must be reset when advancing row groups
+ virtual void ResetDecoders() = 0;
+
+ void SetPageReader(std::unique_ptr<PageReader> reader) {
+ pager_ = std::move(reader);
+ ResetDecoders();
+ }
+
+ bool HasMoreData() const { return pager_ != nullptr; }
+
+ int16_t* def_levels() const {
+ return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
+ }
+
+ int16_t* rep_levels() {
+ return reinterpret_cast<int16_t*>(rep_levels_->mutable_data());
+ }
+
+ uint8_t* values() const { return values_->mutable_data(); }
+
+ /// \brief Number of values written including nulls (if any)
+ int64_t values_written() const { return values_written_; }
+
+ int64_t levels_position() const { return levels_position_; }
+ int64_t levels_written() const { return levels_written_; }
+
+ // We may outwardly have the appearance of having exhausted a column chunk
+ // when in fact we are in the middle of processing the last batch
+ bool has_values_to_process() const { return levels_position_ < levels_written_; }
+
+ int64_t null_count() const { return null_count_; }
+
+ bool nullable_values() const { return nullable_values_; }
+
+ std::shared_ptr<PoolBuffer> ReleaseValues() {
+ auto result = values_;
+ values_ = std::make_shared<PoolBuffer>(pool_);
+ return result;
+ }
+
+ std::shared_ptr<PoolBuffer> ReleaseIsValid() {
+ auto result = valid_bits_;
+ valid_bits_ = std::make_shared<PoolBuffer>(pool_);
+ return result;
+ }
+
+ ::arrow::ArrayBuilder* builder() { return builder_.get(); }
+
+ // Process written repetition/definition levels to reach the end of
+ // records. Process no more levels than necessary to delimit the indicated
+ // number of logical records. Updates internal state of RecordReader
+ //
+ // \return Number of records delimited
+ int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
+ int64_t values_to_read = 0;
+ int64_t records_read = 0;
+
+ const int16_t* def_levels = this->def_levels() + levels_position_;
+ const int16_t* rep_levels = this->rep_levels() + levels_position_;
+
+ DCHECK_GT(max_rep_level_, 0);
+
+ // Count logical records and number of values to read
+ while (levels_position_ < levels_written_) {
+ if (*rep_levels++ == 0) {
+ at_record_start_ = true;
+ if (records_read == num_records) {
+ // We've found the number of records we were looking for
+ break;
+ } else {
+ // Continue
+ ++records_read;
+ }
+ } else {
+ at_record_start_ = false;
+ }
+ if (*def_levels++ == max_def_level_) {
+ ++values_to_read;
+ }
+ ++levels_position_;
+ }
+ *values_seen = values_to_read;
+ return records_read;
+ }
+
+ // Read multiple definition levels into preallocated memory
+ //
+ // Returns the number of decoded definition levels
+ int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
+ if (descr_->max_definition_level() == 0) {
+ return 0;
+ }
+ return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+ }
+
+ int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
+ if (descr_->max_repetition_level() == 0) {
+ return 0;
+ }
+ return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+ }
+
+ int64_t available_values_current_page() const {
+ return num_buffered_values_ - num_decoded_values_;
+ }
+
+ void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
+
+ Type::type type() const { return descr_->physical_type(); }
+
+ const ColumnDescriptor* descr() const { return descr_; }
+
+ void Reserve(int64_t capacity) {
+ ReserveLevels(capacity);
+ ReserveValues(capacity);
+ }
+
+ void ReserveLevels(int64_t capacity) {
+ if (descr_->max_definition_level() > 0 &&
+ (levels_written_ + capacity > levels_capacity_)) {
+ int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1);
+ while (levels_written_ + capacity > new_levels_capacity) {
+ new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1);
+ }
+ PARQUET_THROW_NOT_OK(
+ def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
+ if (descr_->max_repetition_level() > 0) {
+ PARQUET_THROW_NOT_OK(
+ rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
+ }
+ levels_capacity_ = new_levels_capacity;
+ }
+ }
+
+ void ReserveValues(int64_t capacity) {
+ if (values_written_ + capacity > values_capacity_) {
+ int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1);
+ while (values_written_ + capacity > new_values_capacity) {
+ new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1);
+ }
+
+ int type_size = GetTypeByteSize(descr_->physical_type());
+ PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));
+ values_capacity_ = new_values_capacity;
+ }
+ if (nullable_values_) {
+ int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
+ if (valid_bits_->size() < valid_bytes_new) {
+ int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
+ PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
+
+ // Avoid valgrind warnings
+ memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
+ valid_bytes_new - valid_bytes_old);
+ }
+ }
+ }
+
+ void Reset() {
+ ResetValues();
+
+ if (levels_written_ > 0) {
+ const int64_t levels_remaining = levels_written_ - levels_position_;
+ // Shift remaining levels to beginning of buffer and trim to only the number
+ // of decoded levels remaining
+ int16_t* def_data = def_levels();
+ int16_t* rep_data = rep_levels();
+
+ std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
+ std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
+
+ PARQUET_THROW_NOT_OK(
+ def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
+ PARQUET_THROW_NOT_OK(
+ rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
+
+ levels_written_ -= levels_position_;
+ levels_position_ = 0;
+ levels_capacity_ = levels_remaining;
+ }
+
+ records_read_ = 0;
+
+ // Calling Finish on the builders also resets them
+ }
+
+ void ResetValues() {
+ if (values_written_ > 0) {
+ // Resize to 0, but do not shrink to fit
+ PARQUET_THROW_NOT_OK(values_->Resize(0, false));
+ PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
+ values_written_ = 0;
+ values_capacity_ = 0;
+ null_count_ = 0;
+ }
+ }
+
+ protected:
+ const ColumnDescriptor* descr_;
+ ::arrow::MemoryPool* pool_;
+
+ std::unique_ptr<PageReader> pager_;
+ std::shared_ptr<Page> current_page_;
+
+ // Not set if full schema for this field has no optional or repeated elements
+ LevelDecoder definition_level_decoder_;
+
+ // Not set for flat schemas.
+ LevelDecoder repetition_level_decoder_;
+
+ // The total number of values stored in the data page. This is the maximum of
+ // the number of encoded definition levels or encoded values. For
+ // non-repeated, required columns, this is equal to the number of encoded
+ // values. For repeated or optional values, there may be fewer data values
+ // than levels, and this tells you how many encoded levels there are in that
+ // case.
+ int64_t num_buffered_values_;
+
+ // The number of values from the current data page that have been decoded
+ // into memory
+ int64_t num_decoded_values_;
+
+ const int max_def_level_;
+ const int max_rep_level_;
+
+ bool nullable_values_;
+
+ bool at_record_start_;
+ int64_t records_read_;
+
+ int64_t values_written_;
+ int64_t values_capacity_;
+ int64_t null_count_;
+
+ int64_t levels_written_;
+ int64_t levels_position_;
+ int64_t levels_capacity_;
+
+ // TODO(wesm): ByteArray / FixedLenByteArray types
+ std::unique_ptr<::arrow::ArrayBuilder> builder_;
+
+ std::shared_ptr<::arrow::PoolBuffer> values_;
+
+ template <typename T>
+ T* ValuesHead() {
+ return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
+ }
+
+ std::shared_ptr<::arrow::PoolBuffer> valid_bits_;
+ std::shared_ptr<::arrow::PoolBuffer> def_levels_;
+ std::shared_ptr<::arrow::PoolBuffer> rep_levels_;
+};
+
+// The minimum number of repetition/definition levels to decode at a time, for
+// better vectorized performance when doing many smaller record reads
+constexpr int64_t kMinLevelBatchSize = 1024;
+
+template <typename DType>
+class TypedRecordReader : public RecordReader::RecordReaderImpl {
+ public:
+ typedef typename DType::c_type T;
+
+ ~TypedRecordReader() {}
+
+ TypedRecordReader(const ColumnDescriptor* schema, ::arrow::MemoryPool* pool)
+ : RecordReader::RecordReaderImpl(schema, pool), current_decoder_(nullptr) {}
+
+ void ResetDecoders() override { decoders_.clear(); }
+
+ inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
+ uint8_t* valid_bits = valid_bits_->mutable_data();
+ const int64_t valid_bits_offset = values_written_;
+
+ int64_t num_decoded = current_decoder_->DecodeSpaced(
+ ValuesHead<T>(), static_cast<int>(values_with_nulls),
+ static_cast<int>(null_count), valid_bits, valid_bits_offset);
+ DCHECK_EQ(num_decoded, values_with_nulls);
+ }
+
+ inline void ReadValuesDense(int64_t values_to_read) {
+ int64_t num_decoded =
+ current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
+ DCHECK_EQ(num_decoded, values_to_read);
+ }
+
+ // Return number of logical records read
+ int64_t ReadRecordData(const int64_t num_records) {
+ // Conservative upper bound
+ const int64_t possible_num_values =
+ std::max(num_records, levels_written_ - levels_position_);
+ ReserveValues(possible_num_values);
+
+ const int64_t start_levels_position = levels_position_;
+
+ int64_t values_to_read = 0;
+ int64_t records_read = 0;
+ if (max_rep_level_ > 0) {
+ records_read = DelimitRecords(num_records, &values_to_read);
+ } else if (max_def_level_ > 0) {
+ // No repetition levels, skip delimiting logic. Each level represents a
+ // null or not null entry
+ records_read = std::min(levels_written_ - levels_position_, num_records);
+
+ // This is advanced by DelimitRecords, which we skipped
+ levels_position_ += records_read;
+ } else {
+ records_read = values_to_read = num_records;
+ }
+
+ int64_t null_count = 0;
+ if (nullable_values_) {
+ int64_t values_with_nulls = 0;
+ internal::DefinitionLevelsToBitmap(
+ def_levels() + start_levels_position, levels_position_ - start_levels_position,
+ max_def_level_, max_rep_level_, &values_with_nulls, &null_count,
+ valid_bits_->mutable_data(), values_written_);
+ values_to_read = values_with_nulls - null_count;
+ ReadValuesSpaced(values_with_nulls, null_count);
+ ConsumeBufferedValues(levels_position_ - start_levels_position);
+ } else {
+ ReadValuesDense(values_to_read);
+ ConsumeBufferedValues(values_to_read);
+ }
+ // Total values, including null spaces, if any
+ values_written_ += values_to_read + null_count;
+ null_count_ += null_count;
+
+ return records_read;
+ }
+
+ // Returns true if there are still values in this column.
+ bool HasNext() {
+ // Either there is no data page available yet, or the data page has been
+ // exhausted
+ if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+ if (!ReadNewPage() || num_buffered_values_ == 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ int64_t ReadRecords(int64_t num_records) override {
+ // Delimit records, then read values at the end
+ int64_t records_read = 0;
+
+ if (levels_position_ < levels_written_) {
+ records_read += ReadRecordData(num_records);
+ }
+
+ // HasNext invokes ReadNewPage
+ if (records_read == 0 && !HasNext()) {
+ return 0;
+ }
+
+ int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
+
+ // If we are in the middle of a record, we continue until reaching the
+ // desired number of records or the end of the current record if we've found
+ // enough records
+ while (!at_record_start_ || records_read < num_records) {
+ // Is there more data to read in this row group?
+ if (!HasNext()) {
+ break;
+ }
+
+ /// We perform multiple batch reads until we either exhaust the row group
+ /// or observe the desired number of records
+ int64_t batch_size = std::min(level_batch_size, available_values_current_page());
+
+ // No more data in column
+ if (batch_size == 0) {
+ break;
+ }
+
+ if (max_def_level_ > 0) {
+ ReserveLevels(batch_size);
+
+ int16_t* def_levels = this->def_levels() + levels_written_;
+ int16_t* rep_levels = this->rep_levels() + levels_written_;
+
+ // Not present for non-repeated fields
+ int64_t levels_read = 0;
+ if (max_rep_level_ > 0) {
+ levels_read = ReadDefinitionLevels(batch_size, def_levels);
+ if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
+ throw ParquetException("Number of decoded rep / def levels did not match");
+ }
+ } else if (max_def_level_ > 0) {
+ levels_read = ReadDefinitionLevels(batch_size, def_levels);
+ }
+
+ // Exhausted column chunk
+ if (levels_read == 0) {
+ break;
+ }
+
+ levels_written_ += levels_read;
+ records_read += ReadRecordData(num_records - records_read);
+ } else {
+ // No repetition or definition levels
+ batch_size = std::min(num_records - records_read, batch_size);
+ records_read += ReadRecordData(batch_size);
+ }
+ }
+
+ return records_read;
+ }
+
+ private:
+ typedef Decoder<DType> DecoderType;
+
+ // Map of encoding type to the respective decoder object. For example, a
+ // column chunk's data pages may include both dictionary-encoded and
+ // plain-encoded data.
+ std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
+
+ DecoderType* current_decoder_;
+
+ // Advance to the next data page
+ bool ReadNewPage();
+
+ void ConfigureDictionary(const DictionaryPage* page);
+};
+
+template <>
+inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
+ auto values = ValuesHead<ByteArray>();
+ int64_t num_decoded =
+ current_decoder_->Decode(values, static_cast<int>(values_to_read));
+ DCHECK_EQ(num_decoded, values_to_read);
+
+ auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
+ for (int64_t i = 0; i < num_decoded; i++) {
+ PARQUET_THROW_NOT_OK(
+ builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
+ }
+ ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read) {
+ auto values = ValuesHead<FLBA>();
+ int64_t num_decoded =
+ current_decoder_->Decode(values, static_cast<int>(values_to_read));
+ DCHECK_EQ(num_decoded, values_to_read);
+
+ auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
+ for (int64_t i = 0; i < num_decoded; i++) {
+ PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+ }
+ ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read,
+ int64_t null_count) {
+ uint8_t* valid_bits = valid_bits_->mutable_data();
+ const int64_t valid_bits_offset = values_written_;
+ auto values = ValuesHead<ByteArray>();
+
+ int64_t num_decoded = current_decoder_->DecodeSpaced(
+ values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
+ valid_bits_offset);
+ DCHECK_EQ(num_decoded, values_to_read);
+
+ auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
+
+ for (int64_t i = 0; i < num_decoded; i++) {
+ if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+ PARQUET_THROW_NOT_OK(
+ builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
+ } else {
+ PARQUET_THROW_NOT_OK(builder->AppendNull());
+ }
+ }
+ ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read,
+ int64_t null_count) {
+ uint8_t* valid_bits = valid_bits_->mutable_data();
+ const int64_t valid_bits_offset = values_written_;
+ auto values = ValuesHead<FLBA>();
+
+ int64_t num_decoded = current_decoder_->DecodeSpaced(
+ values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
+ valid_bits_offset);
+ DCHECK_EQ(num_decoded, values_to_read);
+
+ auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
+ for (int64_t i = 0; i < num_decoded; i++) {
+ if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+ PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+ } else {
+ PARQUET_THROW_NOT_OK(builder->AppendNull());
+ }
+ }
+ ResetValues();
+}
+
+template <typename DType>
+inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
+ int encoding = static_cast<int>(page->encoding());
+ if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+ page->encoding() == Encoding::PLAIN) {
+ encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
+ }
+
+ auto it = decoders_.find(encoding);
+ if (it != decoders_.end()) {
+ throw ParquetException("Column cannot have more than one dictionary.");
+ }
+
+ if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+ page->encoding() == Encoding::PLAIN) {
+ PlainDecoder<DType> dictionary(descr_);
+ dictionary.SetData(page->num_values(), page->data(), page->size());
+
+ // The dictionary is fully decoded during DictionaryDecoder::Init, so the
+ // DictionaryPage buffer is no longer required after this step
+ //
+ // TODO(wesm): investigate whether this all-or-nothing decoding of the
+ // dictionary makes sense and whether performance can be improved
+
+ auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
+ decoder->SetDict(&dictionary);
+ decoders_[encoding] = decoder;
+ } else {
+ ParquetException::NYI("only plain dictionary encoding has been implemented");
+ }
+
+ current_decoder_ = decoders_[encoding].get();
+}
+
+template <typename DType>
+bool TypedRecordReader<DType>::ReadNewPage() {
+ // Loop until we find the next data page.
+ const uint8_t* buffer;
+
+ while (true) {
+ current_page_ = pager_->NextPage();
+ if (!current_page_) {
+ // EOS
+ return false;
+ }
+
+ if (current_page_->type() == PageType::DICTIONARY_PAGE) {
+ ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
+ continue;
+ } else if (current_page_->type() == PageType::DATA_PAGE) {
+ const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+
+ // Read a data page.
+ num_buffered_values_ = page->num_values();
+
+ // Have not decoded any values from the data page yet
+ num_decoded_values_ = 0;
+
+ buffer = page->data();
+
+ // If the data page includes repetition and definition levels, we
+ // initialize the level decoder and subtract the encoded level bytes from
+ // the page size to determine the number of bytes in the encoded data.
+ int64_t data_size = page->size();
+
+ // Data page Layout: Repetition Levels - Definition Levels - encoded values.
+ // Levels are encoded as rle or bit-packed.
+ // Init repetition levels
+ if (descr_->max_repetition_level() > 0) {
+ int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
+ page->repetition_level_encoding(), descr_->max_repetition_level(),
+ static_cast<int>(num_buffered_values_), buffer);
+ buffer += rep_levels_bytes;
+ data_size -= rep_levels_bytes;
+ }
+ // TODO figure a way to set max_definition_level_ to 0
+ // if the initial value is invalid
+
+ // Init definition levels
+ if (descr_->max_definition_level() > 0) {
+ int64_t def_levels_bytes = definition_level_decoder_.SetData(
+ page->definition_level_encoding(), descr_->max_definition_level(),
+ static_cast<int>(num_buffered_values_), buffer);
+ buffer += def_levels_bytes;
+ data_size -= def_levels_bytes;
+ }
+
+ // Get a decoder object for this page or create a new decoder if this is the
+ // first page with this encoding.
+ Encoding::type encoding = page->encoding();
+
+ if (IsDictionaryIndexEncoding(encoding)) {
+ encoding = Encoding::RLE_DICTIONARY;
+ }
+
+ auto it = decoders_.find(static_cast<int>(encoding));
+ if (it != decoders_.end()) {
+ if (encoding == Encoding::RLE_DICTIONARY) {
+ DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
+ }
+ current_decoder_ = it->second.get();
+ } else {
+ switch (encoding) {
+ case Encoding::PLAIN: {
+ std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
+ decoders_[static_cast<int>(encoding)] = decoder;
+ current_decoder_ = decoder.get();
+ break;
+ }
+ case Encoding::RLE_DICTIONARY:
+ throw ParquetException("Dictionary page must be before data page.");
+
+ case Encoding::DELTA_BINARY_PACKED:
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+ case Encoding::DELTA_BYTE_ARRAY:
+ ParquetException::NYI("Unsupported encoding");
+
+ default:
+ throw ParquetException("Unknown encoding type.");
+ }
+ }
+ current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
+ static_cast<int>(data_size));
+ return true;
+ } else {
+ // We don't know what this page type is. We're allowed to skip non-data
+ // pages.
+ continue;
+ }
+ }
+ return true;
+}
+
+std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
+ MemoryPool* pool) {
+ switch (descr->physical_type()) {
+ case Type::BOOLEAN:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<BooleanType>(descr, pool)));
+ case Type::INT32:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<Int32Type>(descr, pool)));
+ case Type::INT64:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<Int64Type>(descr, pool)));
+ case Type::INT96:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<Int96Type>(descr, pool)));
+ case Type::FLOAT:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<FloatType>(descr, pool)));
+ case Type::DOUBLE:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<DoubleType>(descr, pool)));
+ case Type::BYTE_ARRAY:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool)));
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::shared_ptr<RecordReader>(
+ new RecordReader(new TypedRecordReader<FLBAType>(descr, pool)));
+ default:
+ DCHECK(false);
+ }
+ // Unreachable code, but supress compiler warning
+ return nullptr;
+}
+
+// ----------------------------------------------------------------------
+// Implement public API
+
+RecordReader::RecordReader(RecordReaderImpl* impl) { impl_.reset(impl); }
+
+RecordReader::~RecordReader() {}
+
+int64_t RecordReader::ReadRecords(int64_t num_records) {
+ return impl_->ReadRecords(num_records);
+}
+
+void RecordReader::Reset() { return impl_->Reset(); }
+
+void RecordReader::Reserve(int64_t num_values) { impl_->Reserve(num_values); }
+
+const int16_t* RecordReader::def_levels() const { return impl_->def_levels(); }
+
+const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels(); }
+
+const uint8_t* RecordReader::values() const { return impl_->values(); }
+
+std::shared_ptr<PoolBuffer> RecordReader::ReleaseValues() {
+ return impl_->ReleaseValues();
+}
+
+std::shared_ptr<PoolBuffer> RecordReader::ReleaseIsValid() {
+ return impl_->ReleaseIsValid();
+}
+
+::arrow::ArrayBuilder* RecordReader::builder() { return impl_->builder(); }
+
+int64_t RecordReader::values_written() const { return impl_->values_written(); }
+
+int64_t RecordReader::levels_position() const { return impl_->levels_position(); }
+
+int64_t RecordReader::levels_written() const { return impl_->levels_written(); }
+
+int64_t RecordReader::null_count() const { return impl_->null_count(); }
+
+bool RecordReader::nullable_values() const { return impl_->nullable_values(); }
+
+bool RecordReader::HasMoreData() const { return impl_->HasMoreData(); }
+
+void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
+ impl_->SetPageReader(std::move(reader));
+}
+
+} // namespace internal
+} // namespace parquet
diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h
new file mode 100644
index 0000000..8d55f9d
--- /dev/null
+++ b/src/parquet/arrow/record_reader.h
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_RECORD_READER_H
+#define PARQUET_RECORD_READER_H
+
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include <arrow/buffer.h>
+#include <arrow/builder.h>
+#include <arrow/memory_pool.h>
+#include <arrow/util/bit-util.h>
+
+#include "parquet/column_page.h"
+#include "parquet/schema.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+namespace internal {
+
+/// \brief Stateful column reader that delimits semantic records for both flat
+/// and nested columns
+///
+/// \note API EXPERIMENTAL
+/// \since 1.3.0
+class RecordReader {
+ public:
+ // So that we can create subclasses
+ class RecordReaderImpl;
+
+ static std::shared_ptr<RecordReader> Make(
+ const ColumnDescriptor* descr,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ virtual ~RecordReader();
+
+ /// \brief Decoded definition levels
+ const int16_t* def_levels() const;
+
+ /// \brief Decoded repetition levels
+ const int16_t* rep_levels() const;
+
+ /// \brief Decoded values, including nulls, if any
+ const uint8_t* values() const;
+
+ /// \brief Attempt to read indicated number of records from column chunk
+ /// \return number of records read
+ int64_t ReadRecords(int64_t num_records);
+
+ /// \brief Pre-allocate space for data. Results in better flat read performance
+ void Reserve(int64_t num_values);
+
+ /// \brief Clear consumed values and repetition/definition levels as the
+ /// result of calling ReadRecords
+ void Reset();
+
+ std::shared_ptr<PoolBuffer> ReleaseValues();
+ std::shared_ptr<PoolBuffer> ReleaseIsValid();
+ ::arrow::ArrayBuilder* builder();
+
+ /// \brief Number of values written including nulls (if any)
+ int64_t values_written() const;
+
+ /// \brief Number of definition / repetition levels (from those that have
+ /// been decoded) that have been consumed inside the reader.
+ int64_t levels_position() const;
+
+ /// \brief Number of definition / repetition levels that have been written
+ /// internally in the reader
+ int64_t levels_written() const;
+
+ /// \brief Number of nulls in the leaf
+ int64_t null_count() const;
+
+ /// \brief True if the leaf values are nullable
+ bool nullable_values() const;
+
+ /// \brief Return true if the record reader has more internal data yet to
+ /// process
+ bool HasMoreData() const;
+
+ /// \brief Advance record reader to the next row group
+ /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader
+ void SetPageReader(std::unique_ptr<PageReader> reader);
+
+ private:
+ std::unique_ptr<RecordReaderImpl> impl_;
+ explicit RecordReader(RecordReaderImpl* impl);
+};
+
+} // namespace internal
+} // namespace parquet
+
+#endif // PARQUET_RECORD_READER_H
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 87e5b38..e16a1af 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -49,14 +49,14 @@
const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
-TypePtr MakeDecimalType(const PrimitiveNode* node) {
- int precision = node->decimal_metadata().precision;
- int scale = node->decimal_metadata().scale;
+TypePtr MakeDecimalType(const PrimitiveNode& node) {
+ int precision = node.decimal_metadata().precision;
+ int scale = node.decimal_metadata().scale;
return std::make_shared<::arrow::DecimalType>(precision, scale);
}
-static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
- switch (node->logical_type()) {
+static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) {
+ switch (node.logical_type()) {
case LogicalType::UTF8:
*out = ::arrow::utf8();
break;
@@ -71,17 +71,17 @@
return Status::OK();
}
-static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
- switch (node->logical_type()) {
+static Status FromFLBA(const PrimitiveNode& node, TypePtr* out) {
+ switch (node.logical_type()) {
case LogicalType::NONE:
- *out = ::arrow::fixed_size_binary(node->type_length());
+ *out = ::arrow::fixed_size_binary(node.type_length());
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
default:
std::stringstream ss;
- ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+ ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
<< " for fixed-length binary array";
return Status::NotImplemented(ss.str());
break;
@@ -90,8 +90,8 @@
return Status::OK();
}
-static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
- switch (node->logical_type()) {
+static Status FromInt32(const PrimitiveNode& node, TypePtr* out) {
+ switch (node.logical_type()) {
case LogicalType::NONE:
*out = ::arrow::int32();
break;
@@ -124,7 +124,7 @@
break;
default:
std::stringstream ss;
- ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+ ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
<< " for INT32";
return Status::NotImplemented(ss.str());
break;
@@ -132,8 +132,8 @@
return Status::OK();
}
-static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
- switch (node->logical_type()) {
+static Status FromInt64(const PrimitiveNode& node, TypePtr* out) {
+ switch (node.logical_type()) {
case LogicalType::NONE:
*out = ::arrow::int64();
break;
@@ -157,7 +157,7 @@
break;
default:
std::stringstream ss;
- ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+ ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
<< " for INT64";
return Status::NotImplemented(ss.str());
break;
@@ -165,13 +165,13 @@
return Status::OK();
}
-Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
- if (primitive->logical_type() == LogicalType::NA) {
+Status FromPrimitive(const PrimitiveNode& primitive, TypePtr* out) {
+ if (primitive.logical_type() == LogicalType::NA) {
*out = ::arrow::null();
return Status::OK();
}
- switch (primitive->physical_type()) {
+ switch (primitive.physical_type()) {
case ParquetType::BOOLEAN:
*out = ::arrow::boolean();
break;
@@ -201,33 +201,33 @@
}
// Forward declaration
-Status NodeToFieldInternal(const NodePtr& node,
- const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status NodeToFieldInternal(const Node& node,
+ const std::unordered_set<const Node*>* included_leaf_nodes,
std::shared_ptr<Field>* out);
/*
* Auxilary function to test if a parquet schema node is a leaf node
* that should be included in a resulting arrow schema
*/
-inline bool IsIncludedLeaf(const NodePtr& node,
- const std::unordered_set<NodePtr>* included_leaf_nodes) {
+inline bool IsIncludedLeaf(const Node& node,
+ const std::unordered_set<const Node*>* included_leaf_nodes) {
if (included_leaf_nodes == nullptr) {
return true;
}
- auto search = included_leaf_nodes->find(node);
+ auto search = included_leaf_nodes->find(&node);
return (search != included_leaf_nodes->end());
}
-Status StructFromGroup(const GroupNode* group,
- const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status StructFromGroup(const GroupNode& group,
+ const std::unordered_set<const Node*>* included_leaf_nodes,
TypePtr* out) {
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
*out = nullptr;
- for (int i = 0; i < group->field_count(); i++) {
- RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field));
+ for (int i = 0; i < group.field_count(); i++) {
+ RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field));
if (field != nullptr) {
fields.push_back(field);
}
@@ -238,22 +238,23 @@
return Status::OK();
}
-Status NodeToList(const GroupNode* group,
- const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
+Status NodeToList(const GroupNode& group,
+ const std::unordered_set<const Node*>* included_leaf_nodes,
+ TypePtr* out) {
*out = nullptr;
- if (group->field_count() == 1) {
+ if (group.field_count() == 1) {
// This attempts to resolve the preferred 3-level list encoding.
- NodePtr list_node = group->field(0);
- if (list_node->is_group() && list_node->is_repeated()) {
- const GroupNode* list_group = static_cast<const GroupNode*>(list_node.get());
+ const Node& list_node = *group.field(0);
+ if (list_node.is_group() && list_node.is_repeated()) {
+ const auto& list_group = static_cast<const GroupNode&>(list_node);
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
- if (list_group->field_count() == 1 && !HasStructListName(*list_group)) {
+ if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
// List of primitive type
std::shared_ptr<Field> item_field;
RETURN_NOT_OK(
- NodeToFieldInternal(list_group->field(0), included_leaf_nodes, &item_field));
+ NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field));
if (item_field != nullptr) {
*out = ::arrow::list(item_field);
@@ -263,18 +264,17 @@
std::shared_ptr<::arrow::DataType> inner_type;
RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type));
if (inner_type != nullptr) {
- auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+ auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
*out = ::arrow::list(item_field);
}
}
- } else if (list_node->is_repeated()) {
+ } else if (list_node.is_repeated()) {
// repeated primitive node
std::shared_ptr<::arrow::DataType> inner_type;
- if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) {
- const PrimitiveNode* primitive =
- static_cast<const PrimitiveNode*>(list_node.get());
- RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
- auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+ if (IsIncludedLeaf(static_cast<const Node&>(list_node), included_leaf_nodes)) {
+ RETURN_NOT_OK(
+ FromPrimitive(static_cast<const PrimitiveNode&>(list_node), &inner_type));
+ auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
*out = ::arrow::list(item_field);
}
} else {
@@ -288,49 +288,47 @@
return Status::OK();
}
-Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
+Status NodeToField(const Node& node, std::shared_ptr<Field>* out) {
return NodeToFieldInternal(node, nullptr, out);
}
-Status NodeToFieldInternal(const NodePtr& node,
- const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status NodeToFieldInternal(const Node& node,
+ const std::unordered_set<const Node*>* included_leaf_nodes,
std::shared_ptr<Field>* out) {
std::shared_ptr<::arrow::DataType> type = nullptr;
- bool nullable = !node->is_required();
+ bool nullable = !node.is_required();
*out = nullptr;
- if (node->is_repeated()) {
+ if (node.is_repeated()) {
// 1-level LIST encoding fields are required
std::shared_ptr<::arrow::DataType> inner_type;
- if (node->is_group()) {
- const GroupNode* group = static_cast<const GroupNode*>(node.get());
- RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &inner_type));
- } else if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
- const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
- RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
+ if (node.is_group()) {
+ RETURN_NOT_OK(StructFromGroup(static_cast<const GroupNode&>(node),
+ included_leaf_nodes, &inner_type));
+ } else if (IsIncludedLeaf(node, included_leaf_nodes)) {
+ RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &inner_type));
}
if (inner_type != nullptr) {
- auto item_field = std::make_shared<Field>(node->name(), inner_type, false);
+ auto item_field = std::make_shared<Field>(node.name(), inner_type, false);
type = ::arrow::list(item_field);
nullable = false;
}
- } else if (node->is_group()) {
- const GroupNode* group = static_cast<const GroupNode*>(node.get());
- if (node->logical_type() == LogicalType::LIST) {
+ } else if (node.is_group()) {
+ const auto& group = static_cast<const GroupNode&>(node);
+ if (node.logical_type() == LogicalType::LIST) {
RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
} else {
RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
}
} else {
// Primitive (leaf) node
- if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
- const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
- RETURN_NOT_OK(FromPrimitive(primitive, &type));
+ if (IsIncludedLeaf(node, included_leaf_nodes)) {
+ RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &type));
}
}
if (type != nullptr) {
- *out = std::make_shared<Field>(node->name(), type, nullable);
+ *out = std::make_shared<Field>(node.name(), type, nullable);
}
return Status::OK();
}
@@ -339,12 +337,12 @@
const SchemaDescriptor* parquet_schema,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out) {
- const GroupNode* schema_node = parquet_schema->group_node();
+ const GroupNode& schema_node = *parquet_schema->group_node();
- int num_fields = static_cast<int>(schema_node->field_count());
+ int num_fields = static_cast<int>(schema_node.field_count());
std::vector<std::shared_ptr<Field>> fields(num_fields);
for (int i = 0; i < num_fields; i++) {
- RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i]));
+ RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i]));
}
*out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
@@ -362,13 +360,13 @@
// Index in column_indices should be unique, duplicate indices are merged into one and
// ordering by its first appearing.
int num_columns = static_cast<int>(column_indices.size());
- std::unordered_set<NodePtr> top_nodes; // to deduplicate the top nodes
- std::vector<NodePtr> base_nodes; // to keep the ordering
- std::unordered_set<NodePtr> included_leaf_nodes(num_columns);
+ std::unordered_set<const Node*> top_nodes; // to deduplicate the top nodes
+ std::vector<const Node*> base_nodes; // to keep the ordering
+ std::unordered_set<const Node*> included_leaf_nodes(num_columns);
for (int i = 0; i < num_columns; i++) {
- auto column_desc = parquet_schema->Column(column_indices[i]);
- included_leaf_nodes.insert(column_desc->schema_node());
- auto column_root = parquet_schema->GetColumnRoot(column_indices[i]);
+ const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]);
+ included_leaf_nodes.insert(column_desc->schema_node().get());
+ const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]);
auto insertion = top_nodes.insert(column_root);
if (insertion.second) {
base_nodes.push_back(column_root);
@@ -378,7 +376,7 @@
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
for (auto node : base_nodes) {
- RETURN_NOT_OK(NodeToFieldInternal(node, &included_leaf_nodes, &field));
+ RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field));
if (field != nullptr) {
fields.push_back(field);
}
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index 73e48f2..de153eb 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -37,8 +37,9 @@
namespace arrow {
-::arrow::Status PARQUET_EXPORT NodeToField(const schema::NodePtr& node,
- std::shared_ptr<::arrow::Field>* out);
+PARQUET_EXPORT
+::arrow::Status NodeToField(const schema::Node& node,
+ std::shared_ptr<::arrow::Field>* out);
/// Convert parquet schema to arrow schema with selected indices
/// \param parquet_schema to be converted
@@ -48,16 +49,18 @@
/// \param key_value_metadata optional metadata, can be nullptr
/// \param out the corresponding arrow schema
/// \return Status::OK() on a successful conversion.
-::arrow::Status PARQUET_EXPORT FromParquetSchema(
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out);
// Without indices
-::arrow::Status PARQUET_EXPORT
-FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
- std::shared_ptr<::arrow::Schema>* out);
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
+ const SchemaDescriptor* parquet_schema,
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
+ std::shared_ptr<::arrow::Schema>* out);
// Without metadata
::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index 5f6259f..d23e738 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -21,7 +21,10 @@
#include <cstdint>
#include <memory>
-#include "arrow/util/rle-encoding.h"
+#include <arrow/buffer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/util/bit-util.h>
+#include <arrow/util/rle-encoding.h>
#include "parquet/column_page.h"
#include "parquet/encoding-internal.h"
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index df7deb8..6172365 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -19,6 +19,7 @@
#define PARQUET_COLUMN_READER_H
#include <algorithm>
+#include <climits>
#include <cstdint>
#include <cstring>
#include <iostream>
@@ -26,6 +27,9 @@
#include <unordered_map>
#include <vector>
+#include <arrow/buffer.h>
+#include <arrow/builder.h>
+#include <arrow/memory_pool.h>
#include <arrow/util/bit-util.h>
#include "parquet/column_page.h"
@@ -45,6 +49,8 @@
namespace parquet {
+namespace BitUtil = ::arrow::BitUtil;
+
class PARQUET_EXPORT LevelDecoder {
public:
LevelDecoder();
@@ -104,6 +110,12 @@
// Returns the number of decoded repetition levels
int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels);
+ int64_t available_values_current_page() const {
+ return num_buffered_values_ - num_decoded_values_;
+ }
+
+ void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
+
const ColumnDescriptor* descr_;
std::unique_ptr<PageReader> pager_;
@@ -130,7 +142,58 @@
::arrow::MemoryPool* pool_;
};
-// API to read values from a single column. This is the main client facing API.
+namespace internal {
+
+static inline void DefinitionLevelsToBitmap(
+ const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+ const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+ uint8_t* valid_bits, const int64_t valid_bits_offset) {
+ int64_t byte_offset = valid_bits_offset / 8;
+ int64_t bit_offset = valid_bits_offset % 8;
+ uint8_t bitset = valid_bits[byte_offset];
+
+ // TODO(itaiin): As an interim solution we are splitting the code path here
+ // between repeated+flat column reads, and non-repeated+nested reads.
+ // Those paths need to be merged in the future
+ for (int i = 0; i < num_def_levels; ++i) {
+ if (def_levels[i] == max_definition_level) {
+ bitset |= (1 << bit_offset);
+ } else if (max_repetition_level > 0) {
+ // repetition+flat case
+ if (def_levels[i] == (max_definition_level - 1)) {
+ bitset &= ~(1 << bit_offset);
+ *null_count += 1;
+ } else {
+ continue;
+ }
+ } else {
+ // non-repeated+nested case
+ if (def_levels[i] < max_definition_level) {
+ bitset &= ~(1 << bit_offset);
+ *null_count += 1;
+ } else {
+ throw ParquetException("definition level exceeds maximum");
+ }
+ }
+
+ bit_offset++;
+ if (bit_offset == CHAR_BIT) {
+ bit_offset = 0;
+ valid_bits[byte_offset] = bitset;
+ byte_offset++;
+ // TODO: Except for the last byte, this shouldn't be needed
+ bitset = valid_bits[byte_offset];
+ }
+ }
+ if (bit_offset != 0) {
+ valid_bits[byte_offset] = bitset;
+ }
+ *values_read = bit_offset + byte_offset * 8 - valid_bits_offset;
+}
+
+} // namespace internal
+
+// API to read values from a single column. This is a main client facing API.
template <typename DType>
class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
public:
@@ -138,7 +201,7 @@
TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
- : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
+ : ColumnReader(schema, std::move(pager), pool), current_decoder_(nullptr) {}
virtual ~TypedColumnReader() {}
// Read a batch of repetition levels, definition levels, and values from the
@@ -208,7 +271,7 @@
typedef Decoder<DType> DecoderType;
// Advance to the next data page
- virtual bool ReadNewPage();
+ bool ReadNewPage() override;
// Read up to batch_size values from the current data page into the
// pre-allocated memory T*
@@ -221,7 +284,7 @@
// to the def_levels.
//
// @returns: the number of values read into the out buffer
- int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
+ int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
uint8_t* valid_bits, int64_t valid_bits_offset);
// Map of encoding type to the respective decoder object. For example, a
@@ -234,6 +297,9 @@
DecoderType* current_decoder_;
};
+// ----------------------------------------------------------------------
+// Type column reader implementations
+
template <typename DType>
inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) {
int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
@@ -242,11 +308,12 @@
template <typename DType>
inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
- int null_count,
+ int64_t null_count,
uint8_t* valid_bits,
int64_t valid_bits_offset) {
- return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), null_count,
- valid_bits, valid_bits_offset);
+ return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
+ static_cast<int>(null_count), valid_bits,
+ valid_bits_offset);
}
template <typename DType>
@@ -294,59 +361,34 @@
*values_read = ReadValues(values_to_read, values);
int64_t total_values = std::max(num_def_levels, *values_read);
- num_decoded_values_ += total_values;
+ ConsumeBufferedValues(total_values);
return total_values;
}
-inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
- int16_t max_definition_level,
- int16_t max_repetition_level, int64_t* values_read,
- int64_t* null_count, uint8_t* valid_bits,
- int64_t valid_bits_offset) {
- int byte_offset = static_cast<int>(valid_bits_offset) / 8;
- int bit_offset = static_cast<int>(valid_bits_offset) % 8;
- uint8_t bitset = valid_bits[byte_offset];
+namespace internal {
- // TODO(itaiin): As an interim solution we are splitting the code path here
- // between repeated+flat column reads, and non-repeated+nested reads.
- // Those paths need to be merged in the future
- for (int i = 0; i < num_def_levels; ++i) {
- if (def_levels[i] == max_definition_level) {
- bitset |= (1 << bit_offset);
- } else if (max_repetition_level > 0) {
- // repetition+flat case
- if (def_levels[i] == (max_definition_level - 1)) {
- bitset &= ~(1 << bit_offset);
- *null_count += 1;
- } else {
- continue;
+// TODO(itaiin): another code path split to merge when the general case is done
+static inline bool HasSpacedValues(const ColumnDescriptor* descr) {
+ if (descr->max_repetition_level() > 0) {
+ // repeated+flat case
+ return !descr->schema_node()->is_required();
+ } else {
+ // non-repeated+nested case
+ // Find if a node forces nulls in the lowest level along the hierarchy
+ const schema::Node* node = descr->schema_node().get();
+ while (node) {
+ if (node->is_optional()) {
+ return true;
}
- } else {
- // non-repeated+nested case
- if (def_levels[i] < max_definition_level) {
- bitset &= ~(1 << bit_offset);
- *null_count += 1;
- } else {
- throw ParquetException("definition level exceeds maximum");
- }
+ node = node->parent();
}
-
- bit_offset++;
- if (bit_offset == 8) {
- bit_offset = 0;
- valid_bits[byte_offset] = bitset;
- byte_offset++;
- // TODO: Except for the last byte, this shouldn't be needed
- bitset = valid_bits[byte_offset];
- }
+ return false;
}
- if (bit_offset != 0) {
- valid_bits[byte_offset] = bitset;
- }
- *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
}
+} // namespace internal
+
template <typename DType>
inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
@@ -377,25 +419,7 @@
}
}
- // TODO(itaiin): another code path split to merge when the general case is done
- bool has_spaced_values;
- if (descr_->max_repetition_level() > 0) {
- // repeated+flat case
- has_spaced_values = !descr_->schema_node()->is_required();
- } else {
- // non-repeated+nested case
- // Find if a node forces nulls in the lowest level along the hierarchy
- const schema::Node* node = descr_->schema_node().get();
- has_spaced_values = false;
- while (node) {
- auto parent = node->parent();
- if (node->is_optional()) {
- has_spaced_values = true;
- break;
- }
- node = parent;
- }
- }
+ const bool has_spaced_values = internal::HasSpacedValues(descr_);
int64_t null_count = 0;
if (!has_spaced_values) {
@@ -413,9 +437,9 @@
} else {
int16_t max_definition_level = descr_->max_definition_level();
int16_t max_repetition_level = descr_->max_repetition_level();
- DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
- max_repetition_level, values_read, &null_count, valid_bits,
- valid_bits_offset);
+ internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
+ max_repetition_level, values_read, &null_count,
+ valid_bits, valid_bits_offset);
total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
valid_bits, valid_bits_offset);
}
@@ -432,12 +456,12 @@
*levels_read = total_values;
}
- num_decoded_values_ += *levels_read;
+ ConsumeBufferedValues(*levels_read);
return total_values;
}
template <typename DType>
-inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
+int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
int64_t rows_to_skip = num_rows_to_skip;
while (HasNext() && rows_to_skip > 0) {
// If the number of rows to skip is more than the number of undecoded values, skip the
@@ -472,6 +496,9 @@
return num_rows_to_skip - rows_to_skip;
}
+// ----------------------------------------------------------------------
+// Template instantiations
+
typedef TypedColumnReader<BooleanType> BoolReader;
typedef TypedColumnReader<Int32Type> Int32Reader;
typedef TypedColumnReader<Int64Type> Int64Reader;
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 48f243e..47b86e3 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -491,7 +491,7 @@
auto writer = this->BuildWriter(LARGE_SIZE);
// All values being written are NULL
writer->WriteBatch(this->values_.size(), definition_levels.data(),
- repetition_levels.data(), NULL);
+ repetition_levels.data(), nullptr);
writer->Close();
// Just read the first SMALL_SIZE rows to ensure we could read it back in
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index fab1f63..5818fd3 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -54,7 +54,7 @@
using Decoder<DType>::num_values_;
explicit PlainDecoder(const ColumnDescriptor* descr)
- : Decoder<DType>(descr, Encoding::PLAIN), data_(NULL), len_(0) {
+ : Decoder<DType>(descr, Encoding::PLAIN), data_(nullptr), len_(0) {
if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
type_length_ = descr_->type_length();
} else {
@@ -943,7 +943,7 @@
for (int i = 0; i < max_values; ++i) {
int prefix_len = 0;
prefix_len_decoder_.Decode(&prefix_len, 1);
- ByteArray suffix = {0, NULL};
+ ByteArray suffix = {0, nullptr};
suffix_decoder_.Decode(&suffix, 1);
buffer[i].len = prefix_len + suffix.len;
diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h
index 339eb35..e7ed415 100644
--- a/src/parquet/encoding.h
+++ b/src/parquet/encoding.h
@@ -113,6 +113,10 @@
throw ParquetException("Number of values / definition_levels read did not match");
}
+ // Depending on the number of nulls, some of the value slots in buffer may
+ // be uninitialized, and this will cause valgrind warnings / potentially UB
+ memset(buffer + values_read, 0, (num_values - values_read) * sizeof(T));
+
// Add spacing for null entries. As we have filled the buffer from the front,
// we need to add the spacing from the back.
int values_to_move = values_read;
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 0d8e10e..7f990e0 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -91,7 +91,7 @@
// API convenience to get a MetaData accessor
static std::unique_ptr<ColumnChunkMetaData> Make(
const uint8_t* metadata, const ColumnDescriptor* descr,
- const ApplicationVersion* writer_version = NULL);
+ const ApplicationVersion* writer_version = nullptr);
~ColumnChunkMetaData();
@@ -116,7 +116,7 @@
private:
explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr,
- const ApplicationVersion* writer_version = NULL);
+ const ApplicationVersion* writer_version = nullptr);
// PIMPL Idiom
class ColumnChunkMetaDataImpl;
std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
@@ -127,7 +127,7 @@
// API convenience to get a MetaData accessor
static std::unique_ptr<RowGroupMetaData> Make(
const uint8_t* metadata, const SchemaDescriptor* schema,
- const ApplicationVersion* writer_version = NULL);
+ const ApplicationVersion* writer_version = nullptr);
~RowGroupMetaData();
@@ -141,7 +141,7 @@
private:
explicit RowGroupMetaData(const uint8_t* metadata, const SchemaDescriptor* schema,
- const ApplicationVersion* writer_version = NULL);
+ const ApplicationVersion* writer_version = nullptr);
// PIMPL Idiom
class RowGroupMetaDataImpl;
std::unique_ptr<RowGroupMetaDataImpl> impl_;
diff --git a/src/parquet/file/printer.cc b/src/parquet/file/printer.cc
index 2ba9474..727552d 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/file/printer.cc
@@ -109,7 +109,7 @@
char buffer[bufsize];
// Create readers for selected columns and print contents
- vector<std::shared_ptr<Scanner>> scanners(selected_columns.size(), NULL);
+ vector<std::shared_ptr<Scanner>> scanners(selected_columns.size(), nullptr);
int j = 0;
for (auto i : selected_columns) {
std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 5ff7398..bc14ec9 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -100,7 +100,7 @@
}
// Uncompress it if we need to
- if (decompressor_ != NULL) {
+ if (decompressor_ != nullptr) {
// Grow the uncompressed buffer if we need to.
if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 26876fc..9b9bde9 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -56,6 +56,13 @@
const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
}
+std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
+ DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+ << metadata()->num_columns()
+ << "columns, requested column: " << i;
+ return contents_->GetColumnPageReader(i);
+}
+
// Returns the rowgroup metadata
const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 0467640..7558394 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -58,6 +58,8 @@
// column. Ownership is shared with the RowGroupReader.
std::shared_ptr<ColumnReader> Column(int i);
+ std::unique_ptr<PageReader> GetColumnPageReader(int i);
+
private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index faacb76..f806c12 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -125,17 +125,17 @@
ASSERT_EQ(LogicalType::UTF8, node2.logical_type());
// repetition
- node1 = PrimitiveNode("foo", Repetition::REQUIRED, Type::INT32);
- node2 = PrimitiveNode("foo", Repetition::OPTIONAL, Type::INT32);
PrimitiveNode node3("foo", Repetition::REPEATED, Type::INT32);
-
- ASSERT_TRUE(node1.is_required());
-
- ASSERT_TRUE(node2.is_optional());
- ASSERT_FALSE(node2.is_required());
+ PrimitiveNode node4("foo", Repetition::REQUIRED, Type::INT32);
+ PrimitiveNode node5("foo", Repetition::OPTIONAL, Type::INT32);
ASSERT_TRUE(node3.is_repeated());
ASSERT_FALSE(node3.is_optional());
+
+ ASSERT_TRUE(node4.is_required());
+
+ ASSERT_TRUE(node5.is_optional());
+ ASSERT_FALSE(node5.is_required());
}
TEST_F(TestPrimitiveNode, FromParquet) {
@@ -687,10 +687,10 @@
ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
- ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get());
- ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get());
- ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());
- ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get());
+ ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0));
+ ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3));
+ ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4));
+ ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5));
ASSERT_EQ(schema.get(), descr_.group_node());
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index ddd8ac1..8168dc4 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -20,6 +20,8 @@
#include <algorithm>
#include <memory>
+#include <sstream>
+#include <string>
#include "parquet/exception.h"
#include "parquet/parquet_types.h"
@@ -701,9 +703,15 @@
return result;
}
-const schema::NodePtr& SchemaDescriptor::GetColumnRoot(int i) const {
+const schema::Node* SchemaDescriptor::GetColumnRoot(int i) const {
DCHECK(i >= 0 && i < static_cast<int>(leaves_.size()));
- return leaf_to_base_.find(i)->second;
+ return leaf_to_base_.find(i)->second.get();
+}
+
+std::string SchemaDescriptor::ToString() const {
+ std::ostringstream ss;
+ PrintSchema(schema_.get(), ss);
+ return ss.str();
}
int ColumnDescriptor::type_scale() const {
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index c6b7fbe..f93f0db 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -178,6 +178,9 @@
bool EqualsInternal(const Node* other) const;
void SetParent(const Node* p_parent);
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Node);
};
// Save our breath all over the place with these typedefs
@@ -255,7 +258,7 @@
bool Equals(const Node* other) const override;
- const NodePtr& field(int i) const { return fields_[i]; }
+ NodePtr field(int i) const { return fields_[i]; }
int FieldIndex(const std::string& name) const;
int FieldIndex(const Node& node) const;
@@ -398,10 +401,12 @@
const schema::GroupNode* group_node() const { return group_node_; }
// Returns the root (child of the schema root) node of the leaf(column) node
- const schema::NodePtr& GetColumnRoot(int i) const;
+ const schema::Node* GetColumnRoot(int i) const;
const std::string& name() const { return group_node_->name(); }
+ std::string ToString() const;
+
private:
friend class ColumnDescriptor;
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index d3ec942..1521cbd 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -194,8 +194,9 @@
}
template <typename TestType>
-typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
- TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& values) {
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+ const std::vector<typename TestType::c_type>& values) {
return values;
}
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 3fd72f2..0698652 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -377,7 +377,7 @@
}
shared_ptr<DataPage> page = MakeDataPage<Type>(
d, slice(values, value_start, value_start + values_per_page[i]),
- values_per_page[i], encoding, NULL, 0,
+ values_per_page[i], encoding, nullptr, 0,
slice(def_levels, def_level_start, def_level_end), max_def_level,
slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
pages.push_back(page);
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
index cdb6d21..16617a7 100644
--- a/src/parquet/util/memory-test.cc
+++ b/src/parquet/util/memory-test.cc
@@ -214,7 +214,7 @@
for (int i = 0; i < num_allocs; ++i) {
uint8_t* mem = p.Allocate(alloc_size);
- ASSERT_TRUE(mem != NULL);
+ ASSERT_TRUE(mem != nullptr);
total_allocated += alloc_size;
int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
@@ -239,7 +239,7 @@
for (int i = 0; i < num_allocs; ++i) {
int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
uint8_t* mem = p.Allocate(alloc_size);
- ASSERT_TRUE(mem != NULL);
+ ASSERT_TRUE(mem != nullptr);
total_allocated += alloc_size;
int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index d3a5226..3aa2570 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -121,14 +121,18 @@
template <bool CHECK_LIMIT_FIRST>
uint8_t* ChunkedAllocator::Allocate(int size) {
- if (size == 0) return NULL;
+ if (size == 0) {
+ return nullptr;
+ }
int64_t num_bytes = ::arrow::BitUtil::RoundUp(size, 8);
if (current_chunk_idx_ == -1 ||
num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
chunks_[current_chunk_idx_].size) {
- // If we couldn't allocate a new chunk, return NULL.
- if (ARROW_PREDICT_FALSE(!FindChunk(num_bytes))) return NULL;
+ // If we couldn't allocate a new chunk, return nullptr.
+ if (ARROW_PREDICT_FALSE(!FindChunk(num_bytes))) {
+ return nullptr;
+ }
}
ChunkInfo& info = chunks_[current_chunk_idx_];
uint8_t* result = info.data + info.allocated_bytes;
@@ -195,7 +199,7 @@
// Allocate a new chunk. Return early if malloc fails.
uint8_t* buf = nullptr;
PARQUET_THROW_NOT_OK(pool_->Allocate(chunk_size, &buf));
- if (ARROW_PREDICT_FALSE(buf == NULL)) {
+ if (ARROW_PREDICT_FALSE(buf == nullptr)) {
DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
return false;
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 04dcca4..94b86c1 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -188,7 +188,7 @@
explicit ChunkInfo(int64_t size, uint8_t* buf);
- ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {}
+ ChunkInfo() : data(nullptr), size(0), allocated_bytes(0) {}
};
/// chunk from which we served the last Allocate() call;
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
index 9187962..ef9087b 100644
--- a/src/parquet/util/schema-util.h
+++ b/src/parquet/util/schema-util.h
@@ -25,7 +25,6 @@
#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/types.h"
-#include "parquet/util/logging.h"
using parquet::ParquetException;
using parquet::SchemaDescriptor;
@@ -49,14 +48,14 @@
}
// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
-inline bool IsSimpleStruct(const NodePtr& node) {
+inline bool IsSimpleStruct(const Node* node) {
if (!node->is_group()) return false;
if (node->is_repeated()) return false;
if (node->logical_type() == LogicalType::LIST) return false;
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
- auto group = static_cast<const GroupNode*>(node.get());
+ auto group = static_cast<const GroupNode*>(node);
if (group->field_count() == 1 && HasStructListName(*group)) return false;
return true;