| diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc |
| index ec3890a41f..943f69bb6c 100644 |
| --- a/cpp/src/parquet/arrow/schema.cc |
| +++ b/cpp/src/parquet/arrow/schema.cc |
| @@ -178,7 +178,7 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type, |
| |
| // The user is explicitly asking for Impala int96 encoding, there is no |
| // logical type. |
| - if (arrow_properties.support_deprecated_int96_timestamps()) { |
| + if (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO) { |
| *physical_type = ParquetType::INT96; |
| return Status::OK(); |
| } |
| |
| diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc |
| index 285e2a5973..aa6f92f077 100644 |
| --- a/cpp/src/parquet/arrow/reader.cc |
| +++ b/cpp/src/parquet/arrow/reader.cc |
| @@ -1013,25 +1013,32 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, |
| return Status::OK(); |
| } |
| |
| - int64_t num_rows = 0; |
| + std::vector<int64_t> num_rows; |
| for (int row_group : row_groups) { |
| - num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows(); |
| + num_rows.push_back(parquet_reader()->metadata()->RowGroup(row_group)->num_rows()); |
| } |
| |
| using ::arrow::RecordBatchIterator; |
| + int row_group_idx = 0; |
| |
| // NB: This lambda will be invoked outside the scope of this call to |
| // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value. |
| // `this` is a non-owning pointer so we are relying on the parent FileReader outliving |
| // this RecordBatchReader. |
| ::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator( |
| - [readers, batch_schema, num_rows, |
| + [readers, batch_schema, num_rows, row_group_idx, |
| this]() mutable -> ::arrow::Result<RecordBatchIterator> { |
| ::arrow::ChunkedArrayVector columns(readers.size()); |
| |
| - // don't reserve more rows than necessary |
| - int64_t batch_size = std::min(properties().batch_size(), num_rows); |
| - num_rows -= batch_size; |
| + int64_t batch_size = 0; |
| + if (!num_rows.empty()) { |
| + // don't reserve more rows than necessary |
| + batch_size = std::min(properties().batch_size(), num_rows[row_group_idx]); |
| + num_rows[row_group_idx] -= batch_size; |
| + if (num_rows[row_group_idx] == 0 && (num_rows.size() - 1) != row_group_idx) { |
| + row_group_idx++; |
| + } |
| + } |
| |
| RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( |
| reader_properties_.use_threads(), static_cast<int>(readers.size()), |
| diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc |
| index 4fd7ef1b47..87326a54f1 100644 |
| --- a/cpp/src/parquet/arrow/writer.cc |
| +++ b/cpp/src/parquet/arrow/writer.cc |
| @@ -314,6 +314,14 @@ class FileWriterImpl : public FileWriter { |
| return Status::OK(); |
| } |
| |
| + int64_t GetBufferedSize() override { |
| + if (row_group_writer_ == nullptr) { |
| + return 0; |
| + } |
| + return row_group_writer_->total_compressed_bytes() + |
| + row_group_writer_->total_compressed_bytes_written(); |
| + } |
| + |
| Status Close() override { |
| if (!closed_) { |
| // Make idempotent |
| @@ -418,10 +426,13 @@ class FileWriterImpl : public FileWriter { |
| |
| // Max number of rows allowed in a row group. |
| const int64_t max_row_group_length = this->properties().max_row_group_length(); |
| + const int64_t max_row_group_size = this->properties().max_row_group_size(); |
| |
| // Initialize a new buffered row group writer if necessary. |
| if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || |
| - row_group_writer_->num_rows() >= max_row_group_length) { |
| + row_group_writer_->num_rows() >= max_row_group_length || |
| + (row_group_writer_->total_compressed_bytes_written() + |
| + row_group_writer_->total_compressed_bytes() >= max_row_group_size)) { |
| RETURN_NOT_OK(NewBufferedRowGroup()); |
| } |
| |
| diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h |
| index 4a1a033a7b..0f13d05e44 100644 |
| --- a/cpp/src/parquet/arrow/writer.h |
| +++ b/cpp/src/parquet/arrow/writer.h |
| @@ -138,6 +138,9 @@ class PARQUET_EXPORT FileWriter { |
| /// option in this case. |
| virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0; |
| |
| + /// \brief Return the buffered size in bytes. |
| + virtual int64_t GetBufferedSize() = 0; |
| + |
| /// \brief Write the footer and close the file. |
| virtual ::arrow::Status Close() = 0; |
| virtual ~FileWriter(); |
| diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h |
| index 4d3acb491e..3906ff3c59 100644 |
| --- a/cpp/src/parquet/properties.h |
| +++ b/cpp/src/parquet/properties.h |
| @@ -139,6 +139,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; |
| static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; |
| static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; |
| static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; |
| +static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 128 * 1024 * 1024; |
| static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; |
| static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; |
| static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN; |
| @@ -232,6 +233,7 @@ class PARQUET_EXPORT WriterProperties { |
| dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), |
| write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), |
| max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), |
| + max_row_group_size_(DEFAULT_MAX_ROW_GROUP_SIZE), |
| pagesize_(kDefaultDataPageSize), |
| version_(ParquetVersion::PARQUET_2_6), |
| data_page_version_(ParquetDataPageVersion::V1), |
| @@ -244,6 +246,7 @@ class PARQUET_EXPORT WriterProperties { |
| dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), |
| write_batch_size_(properties.write_batch_size()), |
| max_row_group_length_(properties.max_row_group_length()), |
| + max_row_group_size_(properties.max_row_group_size()), |
| pagesize_(properties.data_pagesize()), |
| version_(properties.version()), |
| data_page_version_(properties.data_page_version()), |
| @@ -321,6 +324,13 @@ class PARQUET_EXPORT WriterProperties { |
| return this; |
| } |
| |
| + /// Specify the max bytes size to put in a single row group. |
| + /// Default 128 M. |
| + Builder* max_row_group_size(int64_t max_row_group_size) { |
| + max_row_group_size_ = max_row_group_size; |
| + return this; |
| + } |
| + |
| /// Specify the data page size. |
| /// Default 1MB. |
| Builder* data_pagesize(int64_t pg_size) { |
| @@ -664,7 +674,7 @@ class PARQUET_EXPORT WriterProperties { |
| |
| return std::shared_ptr<WriterProperties>(new WriterProperties( |
| pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, |
| - pagesize_, version_, created_by_, page_checksum_enabled_, |
| + max_row_group_size_, pagesize_, version_, created_by_, page_checksum_enabled_, |
| std::move(file_encryption_properties_), default_column_properties_, |
| column_properties, data_page_version_, store_decimal_as_integer_, |
| std::move(sorting_columns_))); |
| @@ -675,6 +685,7 @@ class PARQUET_EXPORT WriterProperties { |
| int64_t dictionary_pagesize_limit_; |
| int64_t write_batch_size_; |
| int64_t max_row_group_length_; |
| + int64_t max_row_group_size_; |
| int64_t pagesize_; |
| ParquetVersion::type version_; |
| ParquetDataPageVersion data_page_version_; |
| @@ -705,6 +716,8 @@ class PARQUET_EXPORT WriterProperties { |
| |
| inline int64_t max_row_group_length() const { return max_row_group_length_; } |
| |
| + inline int64_t max_row_group_size() const { return max_row_group_size_; } |
| + |
| inline int64_t data_pagesize() const { return pagesize_; } |
| |
| inline ParquetDataPageVersion data_page_version() const { |
| @@ -810,7 +823,7 @@ class PARQUET_EXPORT WriterProperties { |
| private: |
| explicit WriterProperties( |
| MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, |
| - int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, |
| + int64_t max_row_group_length, int64_t max_row_group_size, int64_t pagesize, ParquetVersion::type version, |
| const std::string& created_by, bool page_write_checksum_enabled, |
| std::shared_ptr<FileEncryptionProperties> file_encryption_properties, |
| const ColumnProperties& default_column_properties, |
| @@ -821,6 +834,7 @@ class PARQUET_EXPORT WriterProperties { |
| dictionary_pagesize_limit_(dictionary_pagesize_limit), |
| write_batch_size_(write_batch_size), |
| max_row_group_length_(max_row_group_length), |
| + max_row_group_size_(max_row_group_size), |
| pagesize_(pagesize), |
| parquet_data_page_version_(data_page_version), |
| parquet_version_(version), |
| @@ -836,6 +850,7 @@ class PARQUET_EXPORT WriterProperties { |
| int64_t dictionary_pagesize_limit_; |
| int64_t write_batch_size_; |
| int64_t max_row_group_length_; |
| + int64_t max_row_group_size_; |
| int64_t pagesize_; |
| ParquetDataPageVersion parquet_data_page_version_; |
| ParquetVersion::type parquet_version_; |