PARQUET-1090: Add max row group length option, fix int32 overflow
Also fixed a race condition on googletest_ep.
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #389 from wesm/int32-overflow and squashes the following commits:
f1d1d44 [Wes McKinney] Remove print statement
7a17404 [Wes McKinney] Add max row group length option, fix int32 overflow
diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt
index 43af746..741523b 100644
--- a/benchmarks/CMakeLists.txt
+++ b/benchmarks/CMakeLists.txt
@@ -22,8 +22,10 @@
if (PARQUET_BUILD_BENCHMARKS)
add_executable(decode_benchmark decode_benchmark.cc)
+ add_dependencies(decode_benchmark gtest)
+
# This uses private APIs
target_link_libraries(decode_benchmark
- ${LINK_LIBS}
+ ${LINK_LIBS}
parquet_static)
endif()
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 076a4b7..c5fe368 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -111,7 +111,7 @@
std::shared_ptr<Array>* values_array) {
// Work downwards to extract bitmaps and offsets
min_offset_idx_ = 0;
- max_offset_idx_ = static_cast<int32_t>(array.length());
+ max_offset_idx_ = array.length();
RETURN_NOT_OK(VisitInline(array));
*num_values = max_offset_idx_ - min_offset_idx_;
*values_offset = min_offset_idx_;
@@ -244,8 +244,8 @@
std::vector<int32_t> array_offsets_;
std::vector<bool> nullable_;
- int32_t min_offset_idx_;
- int32_t max_offset_idx_;
+ int64_t min_offset_idx_;
+ int64_t max_offset_idx_;
::arrow::Type::type values_type_;
std::shared_ptr<Array> values_array_;
};
@@ -291,6 +291,8 @@
Status WriteColumnChunk(const Array& data);
Status Close();
+ const WriterProperties& properties() const { return *writer_->properties(); }
+
virtual ~Impl() {}
private:
@@ -941,11 +943,14 @@
if (chunk_size <= 0) {
return Status::Invalid("chunk size per row_group must be greater than 0");
+ } else if (chunk_size > impl_->properties().max_row_group_length()) {
+ chunk_size = impl_->properties().max_row_group_length();
}
for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
int64_t size = std::min(chunk_size, table.num_rows() - offset);
+
RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
for (int i = 0; i < table.num_columns(); i++) {
std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
diff --git a/src/parquet/properties.h b/src/parquet/properties.h
index 77b0305..a331aae 100644
--- a/src/parquet/properties.h
+++ b/src/parquet/properties.h
@@ -82,6 +82,7 @@
static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
+static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
@@ -114,6 +115,7 @@
: pool_(::arrow::default_memory_pool()),
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
+ max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
pagesize_(DEFAULT_PAGE_SIZE),
version_(DEFAULT_WRITER_VERSION),
created_by_(DEFAULT_CREATED_BY) {}
@@ -162,6 +164,11 @@
return this;
}
+ Builder* max_row_group_length(int64_t max_row_group_length) {
+ max_row_group_length_ = max_row_group_length;
+ return this;
+ }
+
Builder* data_pagesize(int64_t pg_size) {
pagesize_ = pg_size;
return this;
@@ -280,15 +287,17 @@
for (const auto& item : statistics_enabled_)
get(item.first).statistics_enabled = item.second;
- return std::shared_ptr<WriterProperties>(new WriterProperties(
- pool_, dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_,
- created_by_, default_column_properties_, column_properties));
+ return std::shared_ptr<WriterProperties>(
+ new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_,
+ max_row_group_length_, pagesize_, version_, created_by_,
+ default_column_properties_, column_properties));
}
private:
::arrow::MemoryPool* pool_;
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
+ int64_t max_row_group_length_;
int64_t pagesize_;
ParquetVersion::type version_;
std::string created_by_;
@@ -307,6 +316,8 @@
inline int64_t write_batch_size() const { return write_batch_size_; }
+ inline int64_t max_row_group_length() const { return max_row_group_length_; }
+
inline int64_t data_pagesize() const { return pagesize_; }
inline ParquetVersion::type version() const { return parquet_version_; }
@@ -355,12 +366,14 @@
private:
explicit WriterProperties(
::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
- int64_t write_batch_size, int64_t pagesize, ParquetVersion::type version,
- const std::string& created_by, const ColumnProperties& default_column_properties,
+ int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize,
+ ParquetVersion::type version, const std::string& created_by,
+ const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
+ max_row_group_length_(max_row_group_length),
pagesize_(pagesize),
parquet_version_(version),
parquet_created_by_(created_by),
@@ -370,6 +383,7 @@
::arrow::MemoryPool* pool_;
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
+ int64_t max_row_group_length_;
int64_t pagesize_;
ParquetVersion::type parquet_version_;
std::string parquet_created_by_;