blob: 997cb6b325d9066a158e3d466cc755793c1b654c [file] [log] [blame]
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index ec3890a41f..943f69bb6c 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -178,7 +178,7 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
// The user is explicitly asking for Impala int96 encoding, there is no
// logical type.
- if (arrow_properties.support_deprecated_int96_timestamps()) {
+ if (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO) {
*physical_type = ParquetType::INT96;
return Status::OK();
}
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 285e2a5973..aa6f92f077 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1013,25 +1013,32 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
return Status::OK();
}
- int64_t num_rows = 0;
+ std::vector<int64_t> num_rows;
for (int row_group : row_groups) {
- num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
+ num_rows.push_back(parquet_reader()->metadata()->RowGroup(row_group)->num_rows());
}
using ::arrow::RecordBatchIterator;
+ int row_group_idx = 0;
// NB: This lambda will be invoked outside the scope of this call to
// `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
// `this` is a non-owning pointer so we are relying on the parent FileReader outliving
// this RecordBatchReader.
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
- [readers, batch_schema, num_rows,
+ [readers, batch_schema, num_rows, row_group_idx,
this]() mutable -> ::arrow::Result<RecordBatchIterator> {
::arrow::ChunkedArrayVector columns(readers.size());
- // don't reserve more rows than necessary
- int64_t batch_size = std::min(properties().batch_size(), num_rows);
- num_rows -= batch_size;
+ int64_t batch_size = 0;
+ if (!num_rows.empty()) {
+ // don't reserve more rows than necessary
+ batch_size = std::min(properties().batch_size(), num_rows[row_group_idx]);
+ num_rows[row_group_idx] -= batch_size;
+ if (num_rows[row_group_idx] == 0 && (num_rows.size() - 1) != row_group_idx) {
+ row_group_idx++;
+ }
+ }
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(readers.size()),
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 4fd7ef1b47..87326a54f1 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -314,6 +314,14 @@ class FileWriterImpl : public FileWriter {
return Status::OK();
}
+ int64_t GetBufferedSize() override {
+ if (row_group_writer_ == nullptr) {
+ return 0;
+ }
+ return row_group_writer_->total_compressed_bytes() +
+ row_group_writer_->total_compressed_bytes_written();
+ }
+
Status Close() override {
if (!closed_) {
// Make idempotent
@@ -418,10 +426,13 @@ class FileWriterImpl : public FileWriter {
// Max number of rows allowed in a row group.
const int64_t max_row_group_length = this->properties().max_row_group_length();
+ const int64_t max_row_group_size = this->properties().max_row_group_size();
// Initialize a new buffered row group writer if necessary.
if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
- row_group_writer_->num_rows() >= max_row_group_length) {
+ row_group_writer_->num_rows() >= max_row_group_length ||
+ (row_group_writer_->total_compressed_bytes_written() +
+ row_group_writer_->total_compressed_bytes() >= max_row_group_size)) {
RETURN_NOT_OK(NewBufferedRowGroup());
}
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 4a1a033a7b..0f13d05e44 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -138,6 +138,9 @@ class PARQUET_EXPORT FileWriter {
/// option in this case.
virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0;
+ /// \brief Return the buffered size in bytes.
+ virtual int64_t GetBufferedSize() = 0;
+
/// \brief Write the footer and close the file.
virtual ::arrow::Status Close() = 0;
virtual ~FileWriter();
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 4d3acb491e..3906ff3c59 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -139,6 +139,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024;
+static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 128 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN;
@@ -232,6 +233,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
+ max_row_group_size_(DEFAULT_MAX_ROW_GROUP_SIZE),
pagesize_(kDefaultDataPageSize),
version_(ParquetVersion::PARQUET_2_6),
data_page_version_(ParquetDataPageVersion::V1),
@@ -244,6 +246,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()),
write_batch_size_(properties.write_batch_size()),
max_row_group_length_(properties.max_row_group_length()),
+ max_row_group_size_(properties.max_row_group_size()),
pagesize_(properties.data_pagesize()),
version_(properties.version()),
data_page_version_(properties.data_page_version()),
@@ -321,6 +324,13 @@ class PARQUET_EXPORT WriterProperties {
return this;
}
+ /// Specify the max bytes size to put in a single row group.
+ /// Default 128 M.
+ Builder* max_row_group_size(int64_t max_row_group_size) {
+ max_row_group_size_ = max_row_group_size;
+ return this;
+ }
+
/// Specify the data page size.
/// Default 1MB.
Builder* data_pagesize(int64_t pg_size) {
@@ -664,7 +674,7 @@ class PARQUET_EXPORT WriterProperties {
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
- pagesize_, version_, created_by_, page_checksum_enabled_,
+ max_row_group_size_, pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
column_properties, data_page_version_, store_decimal_as_integer_,
std::move(sorting_columns_)));
@@ -675,6 +685,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
+ int64_t max_row_group_size_;
int64_t pagesize_;
ParquetVersion::type version_;
ParquetDataPageVersion data_page_version_;
@@ -705,6 +716,8 @@ class PARQUET_EXPORT WriterProperties {
inline int64_t max_row_group_length() const { return max_row_group_length_; }
+ inline int64_t max_row_group_size() const { return max_row_group_size_; }
+
inline int64_t data_pagesize() const { return pagesize_; }
inline ParquetDataPageVersion data_page_version() const {
@@ -810,7 +823,7 @@ class PARQUET_EXPORT WriterProperties {
private:
explicit WriterProperties(
MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size,
- int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version,
+ int64_t max_row_group_length, int64_t max_row_group_size, int64_t pagesize, ParquetVersion::type version,
const std::string& created_by, bool page_write_checksum_enabled,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
@@ -821,6 +834,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
max_row_group_length_(max_row_group_length),
+ max_row_group_size_(max_row_group_size),
pagesize_(pagesize),
parquet_data_page_version_(data_page_version),
parquet_version_(version),
@@ -836,6 +850,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
+ int64_t max_row_group_size_;
int64_t pagesize_;
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;