PARQUET-1037: allow arbitrary size row-groups
The main change made is that you don't have to specify the size of a row-group upfront when writing it. This is signalled through a "row_count_determined" flag which is threaded through the relevant classes.
The AppendRowGroup(int64_t num_rows) method should have identical behaviour as before.
The AppendRowGroup() method should fix its row group after completing a single column, and enforce all
future columns match that. Empty row groups are still not allowed.
(You'll definitely want to squash these commits. Apologies for the poor git usage.)
Author: Toby Shaw <labuser@LABWKS10>
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #378 from TobyShaw/master and squashes the following commits:
5ba1b75 [Wes McKinney] * Remove number of expected rows from ColumnWriter in a backwards compatible way, refactoring * Fix issue where column length was being checked twice after failed write
14bc5e8 [Toby Shaw] Allow arbitrary size row groups
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 56b4770..1786ca5 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -795,7 +795,7 @@
// to write an Int96 file.
this->sink_ = std::make_shared<InMemoryOutputStream>();
auto writer = ParquetFileWriter::Open(this->sink_, schema);
- RowGroupWriter* rg_writer = writer->AppendRowGroup(1);
+ RowGroupWriter* rg_writer = writer->AppendRowGroup();
ColumnWriter* c_writer = rg_writer->NextColumn();
auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer);
ASSERT_NE(typed_writer, nullptr);
@@ -954,7 +954,7 @@
reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
std::copy(values.cbegin(), values.cend(), values_parquet);
for (int i = 0; i < num_chunks; i++) {
- auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+ auto row_group_writer = file_writer->AppendRowGroup();
auto column_writer =
static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
@@ -1496,7 +1496,7 @@
writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema,
default_writer_properties());
- row_group_writer_ = writer_->AppendRowGroup(num_rows);
+ row_group_writer_ = writer_->AppendRowGroup();
}
void FinalizeParquetFile() {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 55af292..7f1c45c 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -310,6 +310,7 @@
std::unique_ptr<ParquetFileWriter> writer_;
RowGroupWriter* row_group_writer_;
std::shared_ptr<ArrowWriterProperties> arrow_properties_;
+ bool closed_;
};
FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
@@ -318,13 +319,14 @@
data_buffer_(pool),
writer_(std::move(writer)),
row_group_writer_(nullptr),
- arrow_properties_(arrow_properties) {}
+ arrow_properties_(arrow_properties),
+ closed_(false) {}
Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
- PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
+ PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
return Status::OK();
}
@@ -791,10 +793,14 @@
// ----------------------------------------------------------------------
Status FileWriter::Impl::Close() {
- if (row_group_writer_ != nullptr) {
- PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ if (!closed_) {
+ // Make idempotent
+ closed_ = true;
+ if (row_group_writer_ != nullptr) {
+ PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ }
+ PARQUET_CATCH_NOT_OK(writer_->Close());
}
- PARQUET_CATCH_NOT_OK(writer_->Close());
return Status::OK();
}
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index 2abf6fa..c20d6e2 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -35,8 +35,8 @@
const WriterProperties* properties) {
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
- return std::unique_ptr<Int64Writer>(new Int64Writer(
- metadata, std::move(pager), output_size, Encoding::PLAIN, properties));
+ return std::unique_ptr<Int64Writer>(
+ new Int64Writer(metadata, std::move(pager), Encoding::PLAIN, properties));
}
std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 47b86e3..3e4c04f 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -85,8 +85,8 @@
wp_builder.encoding(column_properties.encoding);
}
writer_properties_ = wp_builder.build();
- std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
- metadata_.get(), std::move(pager), output_size, writer_properties_.get());
+ std::shared_ptr<ColumnWriter> writer =
+ ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
}
@@ -402,39 +402,6 @@
ASSERT_EQ(this->values_, this->values_out_);
}
-TYPED_TEST(TestPrimitiveWriter, RequiredTooFewRows) {
- this->GenerateData(SMALL_SIZE - 1);
-
- auto writer = this->BuildWriter();
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
- ASSERT_THROW(writer->Close(), ParquetException);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredTooMany) {
- this->GenerateData(2 * SMALL_SIZE);
-
- auto writer = this->BuildWriter();
- ASSERT_THROW(
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_),
- ParquetException);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RepeatedTooFewRows) {
- // Optional and repeated, so definition and repetition levels
- this->SetUpSchema(Repetition::REPEATED);
-
- this->GenerateData(SMALL_SIZE);
- std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
- definition_levels[1] = 0;
- std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
- repetition_levels[3] = 1;
-
- auto writer = this->BuildWriter();
- writer->WriteBatch(this->values_.size(), definition_levels.data(),
- repetition_levels.data(), this->values_ptr_);
- ASSERT_THROW(writer->Close(), ParquetException);
-}
-
TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
this->GenerateData(LARGE_SIZE);
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index ac7e2ba..4aadf2b 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -113,13 +113,11 @@
}
ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows,
- bool has_dictionary, Encoding::type encoding,
- const WriterProperties* properties)
+ std::unique_ptr<PageWriter> pager, bool has_dictionary,
+ Encoding::type encoding, const WriterProperties* properties)
: metadata_(metadata),
descr_(metadata->descr()),
pager_(std::move(pager)),
- expected_rows_(expected_rows),
has_dictionary_(has_dictionary),
encoding_(encoding),
properties_(properties),
@@ -127,7 +125,7 @@
pool_(properties->memory_pool()),
num_buffered_values_(0),
num_buffered_encoded_values_(0),
- num_rows_(0),
+ rows_written_(0),
total_bytes_written_(0),
closed_(false),
fallback_(false) {
@@ -274,13 +272,6 @@
pager_->Close(has_dictionary_, fallback_);
}
- if (num_rows_ != expected_rows_) {
- std::stringstream ss;
- ss << "Written rows: " << num_rows_ << " != expected rows: " << expected_rows_
- << "in the current column chunk";
- throw ParquetException(ss.str());
- }
-
return total_bytes_written_;
}
@@ -301,11 +292,10 @@
template <typename Type>
TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
std::unique_ptr<PageWriter> pager,
- int64_t expected_rows, Encoding::type encoding,
+ Encoding::type encoding,
const WriterProperties* properties)
- : ColumnWriter(metadata, std::move(pager), expected_rows,
- (encoding == Encoding::PLAIN_DICTIONARY ||
- encoding == Encoding::RLE_DICTIONARY),
+ : ColumnWriter(metadata, std::move(pager), (encoding == Encoding::PLAIN_DICTIONARY ||
+ encoding == Encoding::RLE_DICTIONARY),
encoding, properties) {
switch (encoding) {
case Encoding::PLAIN:
@@ -384,7 +374,6 @@
std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
std::unique_ptr<PageWriter> pager,
- int64_t expected_rows,
const WriterProperties* properties) {
const ColumnDescriptor* descr = metadata->descr();
Encoding::type encoding = properties->encoding(descr->path());
@@ -394,29 +383,29 @@
}
switch (descr->physical_type()) {
case Type::BOOLEAN:
- return std::make_shared<BoolWriter>(metadata, std::move(pager), expected_rows,
- encoding, properties);
+ return std::make_shared<BoolWriter>(metadata, std::move(pager), encoding,
+ properties);
case Type::INT32:
- return std::make_shared<Int32Writer>(metadata, std::move(pager), expected_rows,
- encoding, properties);
+ return std::make_shared<Int32Writer>(metadata, std::move(pager), encoding,
+ properties);
case Type::INT64:
- return std::make_shared<Int64Writer>(metadata, std::move(pager), expected_rows,
- encoding, properties);
+ return std::make_shared<Int64Writer>(metadata, std::move(pager), encoding,
+ properties);
case Type::INT96:
- return std::make_shared<Int96Writer>(metadata, std::move(pager), expected_rows,
- encoding, properties);
+ return std::make_shared<Int96Writer>(metadata, std::move(pager), encoding,
+ properties);
case Type::FLOAT:
- return std::make_shared<FloatWriter>(metadata, std::move(pager), expected_rows,
- encoding, properties);
+ return std::make_shared<FloatWriter>(metadata, std::move(pager), encoding,
+ properties);
case Type::DOUBLE:
- return std::make_shared<DoubleWriter>(metadata, std::move(pager), expected_rows,
- encoding, properties);
+ return std::make_shared<DoubleWriter>(metadata, std::move(pager), encoding,
+ properties);
case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), expected_rows,
- encoding, properties);
+ return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), encoding,
+ properties);
case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<FixedLenByteArrayWriter>(metadata, std::move(pager),
+ encoding, properties);
default:
ParquetException::NYI("type reader not implemented");
}
@@ -453,18 +442,14 @@
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_values; ++i) {
if (rep_levels[i] == 0) {
- num_rows_++;
+ rows_written_++;
}
}
WriteRepetitionLevels(num_values, rep_levels);
} else {
// Each value is exactly one row
- num_rows_ += static_cast<int>(num_values);
- }
-
- if (num_rows_ > expected_rows_) {
- throw ParquetException("More rows were written in the column chunk than expected");
+ rows_written_ += static_cast<int>(num_values);
}
// PARQUET-780
@@ -527,18 +512,14 @@
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_values; ++i) {
if (rep_levels[i] == 0) {
- num_rows_++;
+ rows_written_++;
}
}
WriteRepetitionLevels(num_values, rep_levels);
} else {
// Each value is exactly one row
- num_rows_ += static_cast<int>(num_values);
- }
-
- if (num_rows_ > expected_rows_) {
- throw ParquetException("More rows were written in the column chunk than expected");
+ rows_written_ += static_cast<int>(num_values);
}
if (descr_->schema_node()->is_optional()) {
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 837d2d0..0408747 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -73,12 +73,11 @@
class PARQUET_EXPORT ColumnWriter {
public:
ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
- int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
+ bool has_dictionary, Encoding::type encoding,
const WriterProperties* properties);
static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
std::unique_ptr<PageWriter>,
- int64_t expected_rows,
const WriterProperties* properties);
Type::type type() const { return descr_->physical_type(); }
@@ -92,6 +91,8 @@
*/
int64_t Close();
+ int64_t rows_written() const { return rows_written_; }
+
protected:
virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
@@ -138,8 +139,6 @@
std::unique_ptr<PageWriter> pager_;
- // The number of rows that should be written in this column chunk.
- int64_t expected_rows_;
bool has_dictionary_;
Encoding::type encoding_;
const WriterProperties* properties_;
@@ -162,7 +161,7 @@
int64_t num_buffered_encoded_values_;
// Total number of rows written with this ColumnWriter
- int num_rows_;
+ int rows_written_;
// Records the total number of bytes written by the serializer
int64_t total_bytes_written_;
@@ -195,8 +194,8 @@
typedef typename DType::c_type T;
TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows,
- Encoding::type encoding, const WriterProperties* properties);
+ std::unique_ptr<PageWriter> pager, Encoding::type encoding,
+ const WriterProperties* properties);
// Write a batch of repetition levels, definition levels, and values to the
// column.
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 5bd8419..49cba3a 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -55,8 +55,8 @@
.set_max(std::string(reinterpret_cast<const char*>(&float_max), 4));
auto f_builder = FileMetaDataBuilder::Make(&schema, props);
- auto rg1_builder = f_builder->AppendRowGroup(nrows / 2);
- auto rg2_builder = f_builder->AppendRowGroup(nrows / 2);
+ auto rg1_builder = f_builder->AppendRowGroup();
+ auto rg2_builder = f_builder->AppendRowGroup();
// Write the metadata
// rowgroup1 metadata
@@ -67,6 +67,8 @@
col2_builder->SetStatistics(true, stats_float);
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
+
+ rg1_builder->set_num_rows(nrows / 2);
rg1_builder->Finish(1024);
// rowgroup2 metadata
@@ -77,6 +79,8 @@
col2_builder->SetStatistics(true, stats_float);
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
+
+ rg2_builder->set_num_rows(nrows / 2);
rg2_builder->Finish(1024);
// Read the metadata
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 059df0b..75f3fbd 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -41,11 +41,15 @@
void SetUp() {
num_columns_ = 4;
+ num_rowgroups_ = 2;
+ rows_per_rowgroup_ = 50;
this->SetUpSchema(Repetition::OPTIONAL, num_columns_);
}
protected:
int num_columns_;
+ int num_rowgroups_;
+ int rows_per_rowgroup_;
void FileSerializeTest(Compression::type codec_type) {
std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
@@ -59,48 +63,116 @@
std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
- auto row_group_writer = file_writer->AppendRowGroup(100);
+ for (int rg = 0; rg < num_rowgroups_; ++rg) {
+ RowGroupWriter* row_group_writer;
+ row_group_writer = file_writer->AppendRowGroup();
+ this->GenerateData(rows_per_rowgroup_);
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+ column_writer->WriteBatch(rows_per_rowgroup_, this->def_levels_.data(), nullptr,
+ this->values_ptr_);
+ column_writer->Close();
+ }
- this->GenerateData(100);
- for (int i = 0; i < num_columns_; ++i) {
- auto column_writer =
- static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
- column_writer->WriteBatch(100, this->def_levels_.data(), nullptr,
- this->values_ptr_);
- column_writer->Close();
+ row_group_writer->Close();
}
-
- row_group_writer->Close();
file_writer->Close();
auto buffer = sink->GetBuffer();
+ int num_rows_ = num_rowgroups_ * rows_per_rowgroup_;
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);
ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns());
- ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
- ASSERT_EQ(100, file_reader->metadata()->num_rows());
+ ASSERT_EQ(num_rowgroups_, file_reader->metadata()->num_row_groups());
+ ASSERT_EQ(num_rows_, file_reader->metadata()->num_rows());
- auto rg_reader = file_reader->RowGroup(0);
- ASSERT_EQ(num_columns_, rg_reader->metadata()->num_columns());
- ASSERT_EQ(100, rg_reader->metadata()->num_rows());
- // Check that the specified compression was actually used.
- ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
+ for (int rg = 0; rg < num_rowgroups_; ++rg) {
+ auto rg_reader = file_reader->RowGroup(rg);
+ ASSERT_EQ(num_columns_, rg_reader->metadata()->num_columns());
+ ASSERT_EQ(rows_per_rowgroup_, rg_reader->metadata()->num_rows());
+ // Check that the specified compression was actually used.
+ ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
- int64_t values_read;
+ int64_t values_read;
- for (int i = 0; i < num_columns_; ++i) {
- std::vector<int16_t> def_levels_out(100);
- std::vector<int16_t> rep_levels_out(100);
- auto col_reader =
- std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(i));
- this->SetupValuesOut(100);
- col_reader->ReadBatch(100, def_levels_out.data(), rep_levels_out.data(),
- this->values_out_ptr_, &values_read);
- this->SyncValuesOut();
- ASSERT_EQ(100, values_read);
- ASSERT_EQ(this->values_, this->values_out_);
- ASSERT_EQ(this->def_levels_, def_levels_out);
+ for (int i = 0; i < num_columns_; ++i) {
+ std::vector<int16_t> def_levels_out(rows_per_rowgroup_);
+ std::vector<int16_t> rep_levels_out(rows_per_rowgroup_);
+ auto col_reader =
+ std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(i));
+ this->SetupValuesOut(rows_per_rowgroup_);
+ col_reader->ReadBatch(rows_per_rowgroup_, def_levels_out.data(),
+ rep_levels_out.data(), this->values_out_ptr_, &values_read);
+ this->SyncValuesOut();
+ ASSERT_EQ(rows_per_rowgroup_, values_read);
+ ASSERT_EQ(this->values_, this->values_out_);
+ ASSERT_EQ(this->def_levels_, def_levels_out);
+ }
+ }
+ }
+
+ void UnequalNumRows(int64_t max_rows, const std::vector<int64_t> rows_per_column) {
+ std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+ auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+
+ std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+
+ auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+ RowGroupWriter* row_group_writer;
+ row_group_writer = file_writer->AppendRowGroup();
+
+ this->GenerateData(max_rows);
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+ column_writer->WriteBatch(rows_per_column[col], this->def_levels_.data(), nullptr,
+ this->values_ptr_);
+ column_writer->Close();
+ }
+ row_group_writer->Close();
+ file_writer->Close();
+ }
+
+ void RepeatedUnequalRows() {
+ // Optional and repeated, so definition and repetition levels
+ this->SetUpSchema(Repetition::REPEATED);
+
+ const int kNumRows = 100;
+ this->GenerateData(kNumRows);
+
+ std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+ auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+ std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+ auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+ RowGroupWriter* row_group_writer;
+ row_group_writer = file_writer->AppendRowGroup();
+
+ this->GenerateData(kNumRows);
+
+ std::vector<int16_t> definition_levels(kNumRows, 1);
+ std::vector<int16_t> repetition_levels(kNumRows, 0);
+
+ {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+ column_writer->WriteBatch(kNumRows, definition_levels.data(),
+ repetition_levels.data(), this->values_ptr_);
+ column_writer->Close();
+ }
+
+ definition_levels[1] = 0;
+ repetition_levels[3] = 1;
+
+ {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+ column_writer->WriteBatch(kNumRows, definition_levels.data(),
+ repetition_levels.data(), this->values_ptr_);
+ column_writer->Close();
}
}
};
@@ -115,6 +187,20 @@
this->FileSerializeTest(Compression::UNCOMPRESSED);
}
+TYPED_TEST(TestSerialize, TooFewRows) {
+ std::vector<int64_t> num_rows = {100, 100, 100, 99};
+ ASSERT_THROW(this->UnequalNumRows(100, num_rows), ParquetException);
+}
+
+TYPED_TEST(TestSerialize, TooManyRows) {
+ std::vector<int64_t> num_rows = {100, 100, 100, 101};
+ ASSERT_THROW(this->UnequalNumRows(101, num_rows), ParquetException);
+}
+
+TYPED_TEST(TestSerialize, RepeatedTooFewRows) {
+ ASSERT_THROW(this->RepeatedUnequalRows(), ParquetException);
+}
+
TYPED_TEST(TestSerialize, SmallFileSnappy) {
this->FileSerializeTest(Compression::SNAPPY);
}
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 6e7fc3b..9f1cdd7 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -651,13 +651,11 @@
class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
- explicit RowGroupMetaDataBuilderImpl(int64_t num_rows,
- const std::shared_ptr<WriterProperties>& props,
+ explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
const SchemaDescriptor* schema, uint8_t* contents)
: properties_(props), schema_(schema), current_column_(0) {
row_group_ = reinterpret_cast<format::RowGroup*>(contents);
InitializeColumns(schema->num_columns());
- row_group_->__set_num_rows(num_rows);
}
~RowGroupMetaDataBuilderImpl() {}
@@ -702,8 +700,12 @@
row_group_->__set_total_byte_size(total_byte_size);
}
+ void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; }
+
int num_columns() { return static_cast<int>(row_group_->columns.size()); }
+ int64_t num_rows() { return row_group_->num_rows; }
+
private:
void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }
@@ -715,17 +717,17 @@
};
std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
- int64_t num_rows, const std::shared_ptr<WriterProperties>& props,
- const SchemaDescriptor* schema_, uint8_t* contents) {
+ const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+ uint8_t* contents) {
return std::unique_ptr<RowGroupMetaDataBuilder>(
- new RowGroupMetaDataBuilder(num_rows, props, schema_, contents));
+ new RowGroupMetaDataBuilder(props, schema_, contents));
}
RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
- int64_t num_rows, const std::shared_ptr<WriterProperties>& props,
- const SchemaDescriptor* schema_, uint8_t* contents)
+ const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+ uint8_t* contents)
: impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
- new RowGroupMetaDataBuilderImpl(num_rows, props, schema_, contents))} {}
+ new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {}
@@ -737,6 +739,12 @@
int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); }
+int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); }
+
+void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) {
+ impl_->set_num_rows(num_rows);
+}
+
void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) {
impl_->Finish(total_bytes_written);
}
@@ -753,10 +761,10 @@
}
~FileMetaDataBuilderImpl() {}
- RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows) {
+ RowGroupMetaDataBuilder* AppendRowGroup() {
auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup());
auto row_group_builder = RowGroupMetaDataBuilder::Make(
- num_rows, properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
+ properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get();
row_group_builders_.push_back(std::move(row_group_builder));
row_groups_.push_back(std::move(row_group));
@@ -836,8 +844,8 @@
FileMetaDataBuilder::~FileMetaDataBuilder() {}
-RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup(int64_t num_rows) {
- return impl_->AppendRowGroup(num_rows);
+RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
+ return impl_->AppendRowGroup();
}
std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 7f990e0..7850358 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -224,21 +224,23 @@
public:
// API convenience to get a MetaData reader
static std::unique_ptr<RowGroupMetaDataBuilder> Make(
- int64_t num_rows, const std::shared_ptr<WriterProperties>& props,
- const SchemaDescriptor* schema_, uint8_t* contents);
+ const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+ uint8_t* contents);
~RowGroupMetaDataBuilder();
ColumnChunkMetaDataBuilder* NextColumnChunk();
int num_columns();
+ int64_t num_rows();
int current_column() const;
+ void set_num_rows(int64_t num_rows);
+
// commit the metadata
void Finish(int64_t total_bytes_written);
private:
- explicit RowGroupMetaDataBuilder(int64_t num_rows,
- const std::shared_ptr<WriterProperties>& props,
+ explicit RowGroupMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
const SchemaDescriptor* schema_, uint8_t* contents);
// PIMPL Idiom
class RowGroupMetaDataBuilderImpl;
@@ -254,7 +256,7 @@
~FileMetaDataBuilder();
- RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows);
+ RowGroupMetaDataBuilder* AppendRowGroup();
// commit the metadata
std::unique_ptr<FileMetaData> Finish();
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 5702d2c..712289b 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -174,9 +174,31 @@
int RowGroupSerializer::num_columns() const { return metadata_->num_columns(); }
-int64_t RowGroupSerializer::num_rows() const { return num_rows_; }
+int64_t RowGroupSerializer::num_rows() const {
+ if (current_column_writer_) {
+ CheckRowsWritten();
+ }
+ return num_rows_ < 0 ? 0 : num_rows_;
+}
+
+void RowGroupSerializer::CheckRowsWritten() const {
+ int64_t current_rows = current_column_writer_->rows_written();
+ if (num_rows_ < 0) {
+ num_rows_ = current_rows;
+ metadata_->set_num_rows(current_rows);
+ } else if (num_rows_ != current_rows) {
+ std::stringstream ss;
+ ss << "Column " << current_column_index_ << " had " << current_rows
+ << " while previous column had " << num_rows_;
+ throw ParquetException(ss.str());
+ }
+}
ColumnWriter* RowGroupSerializer::NextColumn() {
+ if (current_column_writer_) {
+ CheckRowsWritten();
+ }
+
// Throws an error if more columns are being written
auto col_meta = metadata_->NextColumnChunk();
@@ -184,12 +206,13 @@
total_bytes_written_ += current_column_writer_->Close();
}
+ ++current_column_index_;
+
const ColumnDescriptor* column_descr = col_meta->descr();
std::unique_ptr<PageWriter> pager(
new SerializedPageWriter(sink_, properties_->compression(column_descr->path()),
col_meta, properties_->memory_pool()));
- current_column_writer_ =
- ColumnWriter::Make(col_meta, std::move(pager), num_rows_, properties_);
+ current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
return current_column_writer_.get();
}
@@ -200,9 +223,11 @@
closed_ = true;
if (current_column_writer_) {
+ CheckRowsWritten();
total_bytes_written_ += current_column_writer_->Close();
current_column_writer_.reset();
}
+
// Ensures all columns have been written
metadata_->Finish(total_bytes_written_);
}
@@ -224,6 +249,7 @@
void FileSerializer::Close() {
if (is_open_) {
if (row_group_writer_) {
+ num_rows_ += row_group_writer_->num_rows();
row_group_writer_->Close();
}
row_group_writer_.reset();
@@ -246,15 +272,14 @@
return properties_;
}
-RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
+RowGroupWriter* FileSerializer::AppendRowGroup() {
if (row_group_writer_) {
row_group_writer_->Close();
}
- num_rows_ += num_rows;
num_row_groups_++;
- auto rg_metadata = metadata_->AppendRowGroup(num_rows);
+ auto rg_metadata = metadata_->AppendRowGroup();
std::unique_ptr<RowGroupWriter::Contents> contents(
- new RowGroupSerializer(num_rows, sink_.get(), rg_metadata, properties_.get()));
+ new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
return row_group_writer_.get();
}
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 5aba994..3cd73fe 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -75,15 +75,15 @@
// RowGroupWriter::Contents implementation for the Parquet file specification
class RowGroupSerializer : public RowGroupWriter::Contents {
public:
- RowGroupSerializer(int64_t num_rows, OutputStream* sink,
- RowGroupMetaDataBuilder* metadata,
+ RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
const WriterProperties* properties)
- : num_rows_(num_rows),
- sink_(sink),
+ : sink_(sink),
metadata_(metadata),
properties_(properties),
total_bytes_written_(0),
- closed_(false) {}
+ closed_(false),
+ current_column_index_(0),
+ num_rows_(-1) {}
int num_columns() const override;
int64_t num_rows() const override;
@@ -93,12 +93,15 @@
void Close() override;
private:
- int64_t num_rows_;
OutputStream* sink_;
- RowGroupMetaDataBuilder* metadata_;
+ mutable RowGroupMetaDataBuilder* metadata_;
const WriterProperties* properties_;
int64_t total_bytes_written_;
bool closed_;
+ int current_column_index_;
+ mutable int64_t num_rows_;
+
+ void CheckRowsWritten() const;
std::shared_ptr<ColumnWriter> current_column_writer_;
};
@@ -116,7 +119,7 @@
void Close() override;
- RowGroupWriter* AppendRowGroup(int64_t num_rows) override;
+ RowGroupWriter* AppendRowGroup() override;
const std::shared_ptr<WriterProperties>& properties() const override;
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index a1b9227..a91553b 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -33,7 +33,6 @@
void RowGroupWriter::Close() {
if (contents_) {
contents_->Close();
- contents_.reset();
}
}
@@ -105,8 +104,12 @@
}
}
+RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
+ return contents_->AppendRowGroup();
+}
+
RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
- return contents_->AppendRowGroup(num_rows);
+ return AppendRowGroup();
}
const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index c2b3f91..844aa16 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -56,14 +56,12 @@
explicit RowGroupWriter(std::unique_ptr<Contents> contents);
- /**
- * Construct a ColumnWriter for the indicated row group-relative column.
- *
- * Ownership is solely within the RowGroupWriter. The ColumnWriter is only valid
- * until the next call to NextColumn or Close. As the contents are directly written to
- * the sink, once a new column is started, the contents of the previous one cannot be
- * modified anymore.
- */
+ /// Construct a ColumnWriter for the indicated row group-relative column.
+ ///
+ /// Ownership is solely within the RowGroupWriter. The ColumnWriter is only
+ /// valid until the next call to NextColumn or Close. As the contents are
+ /// directly written to the sink, once a new column is started, the contents
+ /// of the previous one cannot be modified anymore.
ColumnWriter* NextColumn();
/// Index of currently written column
int current_column();
@@ -96,7 +94,10 @@
// Perform any cleanup associated with the file contents
virtual void Close() = 0;
- virtual RowGroupWriter* AppendRowGroup(int64_t num_rows) = 0;
+ /// \deprecated Since 1.3.0
+ RowGroupWriter* AppendRowGroup(int64_t num_rows);
+
+ virtual RowGroupWriter* AppendRowGroup() = 0;
virtual int64_t num_rows() const = 0;
virtual int num_columns() const = 0;
@@ -135,54 +136,45 @@
void Open(std::unique_ptr<Contents> contents);
void Close();
- /**
- * Construct a RowGroupWriter for the indicated number of rows.
- *
- * Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
- * until the next call to AppendRowGroup or Close.
- *
- * @param num_rows The number of rows that are stored in the new RowGroup
- */
+ // Construct a RowGroupWriter for the indicated number of rows.
+ //
+ // Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
+ // until the next call to AppendRowGroup or Close.
+ // @param num_rows The number of rows that are stored in the new RowGroup
+ //
+ // \deprecated Since 1.3.0
RowGroupWriter* AppendRowGroup(int64_t num_rows);
- /**
- * Number of columns.
- *
- * This number is fixed during the lifetime of the writer as it is determined via
- * the schema.
- */
+ /// Construct a RowGroupWriter with an arbitrary number of rows.
+ ///
+ /// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
+ /// until the next call to AppendRowGroup or Close.
+ RowGroupWriter* AppendRowGroup();
+
+ /// Number of columns.
+ ///
+ /// This number is fixed during the lifetime of the writer as it is determined via
+ /// the schema.
int num_columns() const;
- /**
- * Number of rows in the yet started RowGroups.
- *
- * Changes on the addition of a new RowGroup.
- */
+ /// Number of rows in the yet started RowGroups.
+ ///
+ /// Changes on the addition of a new RowGroup.
int64_t num_rows() const;
- /**
- * Number of started RowGroups.
- */
+ /// Number of started RowGroups.
int num_row_groups() const;
- /**
- * Configuration passed to the writer, e.g. the used Parquet format version.
- */
+ /// Configuration passed to the writer, e.g. the used Parquet format version.
const std::shared_ptr<WriterProperties>& properties() const;
- /**
- * Returns the file schema descriptor
- */
+ /// Returns the file schema descriptor
const SchemaDescriptor* schema() const;
- /**
- * Returns a column descriptor in schema
- */
+ /// Returns a column descriptor in schema
const ColumnDescriptor* descr(int i) const;
- /**
- * Returns the file custom metadata
- */
+ /// Returns the file custom metadata
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
private:
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index 1521cbd..26d53d3 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -138,7 +138,7 @@
std::shared_ptr<WriterProperties> writer_properties =
WriterProperties::Builder().enable_statistics("column")->build();
auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
- auto row_group_writer = file_writer->AppendRowGroup(num_values);
+ auto row_group_writer = file_writer->AppendRowGroup();
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
@@ -438,7 +438,7 @@
auto file_writer = parquet::ParquetFileWriter::Open(parquet_sink_, schema_, props);
// Append a RowGroup with a specific number of rows.
- auto rg_writer = file_writer->AppendRowGroup(NUM_VALUES);
+ auto rg_writer = file_writer->AppendRowGroup();
this->SetValues();