blob: 30673c59d2843f3285bb4016418db40c80176ae2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/file_writer.h"
#include <vector>
#include "parquet/column_writer.h"
#include "parquet/schema-internal.h"
#include "parquet/schema.h"
#include "parquet/thrift.h"
#include "parquet/util/memory.h"
using arrow::MemoryPool;
using parquet::schema::GroupNode;
using parquet::schema::SchemaFlattener;
namespace parquet {
// FIXME: copied from reader-internal.cc
static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
// ----------------------------------------------------------------------
// RowGroupWriter public API
RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
: contents_(std::move(contents)) {}
void RowGroupWriter::Close() {
if (contents_) {
contents_->Close();
}
}
ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); }
int64_t RowGroupWriter::total_compressed_bytes() const {
return contents_->total_compressed_bytes();
}
int64_t RowGroupWriter::total_bytes_written() const {
return contents_->total_bytes_written();
}
int RowGroupWriter::current_column() { return contents_->current_column(); }
int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) {
std::stringstream ss;
ss << "Column " << col << " had " << curr << " while previous column had " << prev;
throw ParquetException(ss.str());
}
// ----------------------------------------------------------------------
// RowGroupSerializer
// RowGroupWriter::Contents implementation for the Parquet file specification
class RowGroupSerializer : public RowGroupWriter::Contents {
public:
RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
const WriterProperties* properties, bool buffered_row_group = false)
: sink_(sink),
metadata_(metadata),
properties_(properties),
total_bytes_written_(0),
closed_(false),
current_column_index_(0),
num_rows_(0),
buffered_row_group_(buffered_row_group) {
if (buffered_row_group) {
InitColumns();
} else {
column_writers_.push_back(nullptr);
}
}
int num_columns() const override { return metadata_->num_columns(); }
int64_t num_rows() const override {
CheckRowsWritten();
// CheckRowsWritten ensures num_rows_ is set correctly
return num_rows_;
}
ColumnWriter* NextColumn() override {
if (buffered_row_group_) {
throw ParquetException(
"NextColumn() is not supported when a RowGroup is written by size");
}
if (column_writers_[0]) {
CheckRowsWritten();
}
// Throws an error if more columns are being written
auto col_meta = metadata_->NextColumnChunk();
if (column_writers_[0]) {
total_bytes_written_ += column_writers_[0]->Close();
}
++current_column_index_;
const ColumnDescriptor* column_descr = col_meta->descr();
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta,
properties_->memory_pool());
column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
return column_writers_[0].get();
}
ColumnWriter* column(int i) override {
if (!buffered_row_group_) {
throw ParquetException(
"column() is only supported when a BufferedRowGroup is being written");
}
if (i >= 0 && i < static_cast<int>(column_writers_.size())) {
return column_writers_[i].get();
}
return nullptr;
}
int current_column() const override { return metadata_->current_column(); }
int64_t total_compressed_bytes() const override {
int64_t total_compressed_bytes = 0;
for (size_t i = 0; i < column_writers_.size(); i++) {
if (column_writers_[i]) {
total_compressed_bytes += column_writers_[i]->total_compressed_bytes();
}
}
return total_compressed_bytes;
}
int64_t total_bytes_written() const override {
int64_t total_bytes_written = 0;
for (size_t i = 0; i < column_writers_.size(); i++) {
if (column_writers_[i]) {
total_bytes_written += column_writers_[i]->total_bytes_written();
}
}
return total_bytes_written;
}
void Close() override {
if (!closed_) {
closed_ = true;
CheckRowsWritten();
for (size_t i = 0; i < column_writers_.size(); i++) {
if (column_writers_[i]) {
total_bytes_written_ += column_writers_[i]->Close();
column_writers_[i].reset();
}
}
column_writers_.clear();
// Ensures all columns have been written
metadata_->set_num_rows(num_rows_);
metadata_->Finish(total_bytes_written_);
}
}
private:
OutputStream* sink_;
mutable RowGroupMetaDataBuilder* metadata_;
const WriterProperties* properties_;
int64_t total_bytes_written_;
bool closed_;
int current_column_index_;
mutable int64_t num_rows_;
bool buffered_row_group_;
void CheckRowsWritten() const {
// verify when only one column is written at a time
if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) {
int64_t current_col_rows = column_writers_[0]->rows_written();
if (num_rows_ == 0) {
num_rows_ = current_col_rows;
} else if (num_rows_ != current_col_rows) {
ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_);
}
} else if (buffered_row_group_ &&
column_writers_.size() > 0) { // when buffered_row_group = true
int64_t current_col_rows = column_writers_[0]->rows_written();
for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) {
int64_t current_col_rows_i = column_writers_[i]->rows_written();
if (current_col_rows != current_col_rows_i) {
ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows);
}
}
num_rows_ = current_col_rows;
}
}
void InitColumns() {
for (int i = 0; i < num_columns(); i++) {
auto col_meta = metadata_->NextColumnChunk();
const ColumnDescriptor* column_descr = col_meta->descr();
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink_, properties_->compression(column_descr->path()),
col_meta, properties_->memory_pool(), buffered_row_group_);
column_writers_.push_back(
ColumnWriter::Make(col_meta, std::move(pager), properties_));
}
}
std::vector<std::shared_ptr<ColumnWriter>> column_writers_;
};
// ----------------------------------------------------------------------
// FileSerializer
// An implementation of ParquetFileWriter::Contents that deals with the Parquet
// file structure, Thrift serialization, and other internal matters
class FileSerializer : public ParquetFileWriter::Contents {
public:
static std::unique_ptr<ParquetFileWriter::Contents> Open(
const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
std::unique_ptr<ParquetFileWriter::Contents> result(
new FileSerializer(sink, schema, properties, key_value_metadata));
return result;
}
void Close() override {
if (is_open_) {
if (row_group_writer_) {
num_rows_ += row_group_writer_->num_rows();
row_group_writer_->Close();
}
row_group_writer_.reset();
// Write magic bytes and metadata
auto metadata = metadata_->Finish();
WriteFileMetaData(*metadata, sink_.get());
sink_->Close();
is_open_ = false;
}
}
int num_columns() const override { return schema_.num_columns(); }
int num_row_groups() const override { return num_row_groups_; }
int64_t num_rows() const override { return num_rows_; }
const std::shared_ptr<WriterProperties>& properties() const override {
return properties_;
}
RowGroupWriter* AppendRowGroup(bool buffered_row_group) {
if (row_group_writer_) {
row_group_writer_->Close();
}
num_row_groups_++;
auto rg_metadata = metadata_->AppendRowGroup();
std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
sink_.get(), rg_metadata, properties_.get(), buffered_row_group));
row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
return row_group_writer_.get();
}
RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); }
RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }
~FileSerializer() override {
try {
Close();
} catch (...) {
}
}
private:
FileSerializer(const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
: ParquetFileWriter::Contents(schema, key_value_metadata),
sink_(sink),
is_open_(true),
properties_(properties),
num_row_groups_(0),
num_rows_(0),
metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) {
StartFile();
}
std::shared_ptr<OutputStream> sink_;
bool is_open_;
const std::shared_ptr<WriterProperties> properties_;
int num_row_groups_;
int64_t num_rows_;
std::unique_ptr<FileMetaDataBuilder> metadata_;
// Only one of the row group writers is active at a time
std::unique_ptr<RowGroupWriter> row_group_writer_;
void StartFile() {
// Parquet files always start with PAR1
sink_->Write(PARQUET_MAGIC, 4);
}
};
// ----------------------------------------------------------------------
// ParquetFileWriter public API
ParquetFileWriter::ParquetFileWriter() {}
ParquetFileWriter::~ParquetFileWriter() {
try {
Close();
} catch (...) {
}
}
std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
key_value_metadata);
}
std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<schema::GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata);
std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
result->Open(std::move(contents));
return result;
}
void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) {
// Write MetaData
uint32_t metadata_len = static_cast<uint32_t>(sink->Tell());
file_metadata.WriteTo(sink);
metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len;
// Write Footer
sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
sink->Write(PARQUET_MAGIC, 4);
}
const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
return contents_->schema()->Column(i);
}
int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); }
const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata()
const {
return contents_->key_value_metadata();
}
void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
contents_ = std::move(contents);
}
void ParquetFileWriter::Close() {
if (contents_) {
contents_->Close();
contents_.reset();
}
}
RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
return contents_->AppendRowGroup();
}
RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() {
return contents_->AppendBufferedRowGroup();
}
RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
return AppendRowGroup();
}
const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
return contents_->properties();
}
} // namespace parquet