blob: 91a20848bd020007bf0e4a4b59e3456ed4db5396 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-parquet-table-writer.h"
#include <boost/unordered_set.hpp>
#include "common/version.h"
#include "exec/hdfs-table-sink.h"
#include "exec/parquet-column-stats.inline.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "rpc/thrift-util.h"
#include "runtime/decimal-value.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.inline.h"
#include "util/bit-stream-utils.h"
#include "util/bit-util.h"
#include "util/buffer-builder.h"
#include "util/compress.h"
#include "util/debug-util.h"
#include "util/dict-encoding.h"
#include "util/hdfs-util.h"
#include "util/string-util.h"
#include "util/rle-encoding.h"
#include <sstream>
#include "gen-cpp/ImpalaService_types.h"
#include "common/names.h"
using namespace impala;
using namespace apache::thrift;
// Managing file sizes: We need to estimate how big the files being buffered
// are in order to split them correctly in HDFS. Having a file that is too big
// will cause remote reads (parquet files are non-splittable).
// It's too expensive to compute the exact file sizes as the rows are buffered
// since the values in the current pages are only encoded/compressed when the page
// is full. Once the page is full, we encode and compress it, at which point we know
// the exact on file size.
// The current buffered pages (one for each column) can have a very poor estimate.
// To adjust for this, we aim for a slightly smaller file size than the ideal.
//
// Class that encapsulates all the state for writing a single column. This contains
// all the buffered pages as well as the metadata (e.g. byte sizes, num values, etc).
// This is intended to be created once per writer per column and reused across
// row groups.
// We currently accumulate all the data pages for an entire row group per column
// before flushing them. This can be pretty large (hundreds of MB) but we can't
// fix this without collocated files in HDFS. With collocated files, the minimum
// we'd need to buffer is 1 page per column so on the order of 1MB (although we might
// decide to buffer a few pages for better HDFS write performance).
// Pages are reused between flushes. They are created on demand as necessary and
// recycled after a flush.
// As rows come in, we accumulate the encoded values into the values_ and def_levels_
// buffers. When we've accumulated a page worth's of data, we combine values_ and
// def_levels_ into a single buffer that would be the exact bytes (with no gaps) in
// the file. The combined buffer is compressed if compression is enabled and we
// keep the combined/compressed buffer until we need to flush the file. The
// values_ and def_levels_ are then reused for the next page.
//
// TODO: For codegen, we would codegen the AppendRow() function for each column.
// This codegen is specific to the column expr (and type) and encoding. The
// parent writer object would combine all the generated AppendRow from all
// the columns and run that function over row batches.
// TODO: we need to pass in the compression from the FE/metadata
namespace impala {
// Base class for column writers. This contains most of the logic except for
// the type specific functions which are implemented in the subclasses.
class HdfsParquetTableWriter::BaseColumnWriter {
public:
// expr - the expression to generate output values for this column.
BaseColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* expr_eval,
const THdfsCompression::type& codec)
: parent_(parent),
expr_eval_(expr_eval),
codec_(codec),
page_size_(DEFAULT_DATA_PAGE_SIZE),
current_page_(nullptr),
num_values_(0),
total_compressed_byte_size_(0),
total_uncompressed_byte_size_(0),
dict_encoder_base_(nullptr),
def_levels_(nullptr),
values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
page_stats_base_(nullptr),
row_group_stats_base_(nullptr),
table_sink_mem_tracker_(parent_->parent_->mem_tracker()) {
static_assert(std::is_same<decltype(parent_->parent_), HdfsTableSink*>::value,
"'table_sink_mem_tracker_' must point to the mem tracker of an HdfsTableSink");
def_levels_ = parent_->state_->obj_pool()->Add(
new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
DEFAULT_DATA_PAGE_SIZE, 1));
values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
}
virtual ~BaseColumnWriter() {}
// Called after the constructor to initialize the column writer.
Status Init() WARN_UNUSED_RESULT {
Reset();
RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_));
return Status::OK();
}
// Appends the row to this column. This buffers the value into a data page. Returns
// error if the space needed for the encoded value is larger than the data page size.
// TODO: this needs to be batch based, instead of row based for better performance.
// This is a bit trickier to handle the case where only a partial row batch can be
// output to the current file because it reaches the max file size. Enabling codegen
// would also solve this problem.
Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT;
// Flushes all buffered data pages to the file.
// *file_pos is an output parameter and will be incremented by
// the number of bytes needed to write all the data pages for this column.
// first_data_page and first_dictionary_page are also out parameters and
// will contain the byte offset for the data page and dictionary page. They
// will be set to -1 if the column does not contain that type of page.
Status Flush(int64_t* file_pos, int64_t* first_data_page,
int64_t* first_dictionary_page) WARN_UNUSED_RESULT;
// Materializes the column statistics to the per-file MemPool so they are available
// after their row batch buffer has been freed.
Status MaterializeStatsValues() WARN_UNUSED_RESULT {
RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers());
RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers());
return Status::OK();
}
// Encodes the row group statistics into a parquet::Statistics object and attaches it to
// 'meta_data'.
void EncodeRowGroupStats(parquet::ColumnMetaData* meta_data) {
DCHECK(row_group_stats_base_ != nullptr);
if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
meta_data->__isset.statistics = true;
}
}
// Resets all the data accumulated for this column. Memory can now be reused for
// the next row group.
// Any data for previous row groups must be reset (e.g. dictionaries).
// Subclasses must call this if they override this function.
virtual void Reset() {
num_data_pages_ = 0;
current_page_ = nullptr;
num_values_ = 0;
total_compressed_byte_size_ = 0;
current_encoding_ = parquet::Encoding::PLAIN;
next_page_encoding_ = parquet::Encoding::PLAIN;
column_encodings_.clear();
dict_encoding_stats_.clear();
data_encoding_stats_.clear();
// Repetition/definition level encodings are constant. Incorporate them here.
column_encodings_.insert(parquet::Encoding::RLE);
offset_index_.page_locations.clear();
column_index_.null_pages.clear();
column_index_.min_values.clear();
column_index_.max_values.clear();
table_sink_mem_tracker_->Release(page_index_memory_consumption_);
page_index_memory_consumption_ = 0;
column_index_.null_counts.clear();
valid_column_index_ = true;
}
// Close this writer. This is only called after Flush() and no more rows will
// be added.
void Close() {
if (compressor_.get() != nullptr) compressor_->Close();
if (dict_encoder_base_ != nullptr) dict_encoder_base_->Close();
// We must release the memory consumption of this column writer.
table_sink_mem_tracker_->Release(page_index_memory_consumption_);
page_index_memory_consumption_ = 0;
}
const ColumnType& type() const { return expr_eval_->root().type(); }
uint64_t num_values() const { return num_values_; }
uint64_t total_compressed_size() const { return total_compressed_byte_size_; }
uint64_t total_uncompressed_size() const { return total_uncompressed_byte_size_; }
parquet::CompressionCodec::type GetParquetCodec() const {
return ConvertImpalaToParquetCodec(codec_);
}
protected:
friend class HdfsParquetTableWriter;
// Encodes value into the current page output buffer and updates the column statistics
// aggregates. Returns true if the value was appended successfully to the current page.
// Returns false if the value was not appended to the current page and the caller can
// create a new page and try again with the same value. May change
// 'next_page_encoding_' if the encoding for the next page should be different - e.g.
// if a dictionary overflowed and dictionary encoding is no longer viable.
// *bytes_needed will contain the (estimated) number of bytes needed to successfully
// encode the value in the page.
// Implemented in the subclass.
virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0;
// Encodes out all data for the current page and updates the metadata.
virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
// Update current_page_ to a new page, reusing pages allocated if possible.
void NewPage();
// Writes out the dictionary encoded data buffered in dict_encoder_.
void WriteDictDataPage();
struct DataPage {
// Page header. This is a union of all page types.
parquet::PageHeader header;
// Number of bytes needed to store definition levels.
int num_def_bytes;
// This is the payload for the data page. This includes the definition/repetition
// levels data and the encoded values. If compression is enabled, this is the
// compressed data.
uint8_t* data;
// If true, this data page has been finalized. All sizes are computed, header is
// fully populated and any compression is done.
bool finalized;
// Number of non-null values
int num_non_null;
};
HdfsParquetTableWriter* parent_;
ScalarExprEvaluator* expr_eval_;
THdfsCompression::type codec_;
// Compression codec for this column. If nullptr, this column is will not be
// compressed.
scoped_ptr<Codec> compressor_;
vector<DataPage> pages_;
// Number of pages in 'pages_' that are used. 'pages_' is reused between flushes
// so this number can be less than pages_.size()
int num_data_pages_;
// Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is increased
// when pages are not big enough. This only happens when there are enough unique values
// such that we switch from PLAIN_DICTIONARY to PLAIN encoding and then have very
// large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE).
// TODO: Consider removing and only creating a single large page as necessary.
int64_t page_size_;
// Pointer to the current page in 'pages_'. Not owned.
DataPage* current_page_;
// Total number of values across all pages, including NULL.
int64_t num_values_;
int64_t total_compressed_byte_size_;
int64_t total_uncompressed_byte_size_;
// Encoding of the current page.
parquet::Encoding::type current_encoding_;
// Encoding to use for the next page. By default, the same as 'current_encoding_'.
// Used by the column writer to switch encoding while writing a column, e.g. if the
// dictionary overflows.
parquet::Encoding::type next_page_encoding_;
// Set of all encodings used in the column chunk
unordered_set<parquet::Encoding::type> column_encodings_;
// Map from the encoding to the number of pages in the column chunk with this encoding
// These are used to construct the PageEncodingStats, which provide information
// about encoding usage for each different page type. Currently, only dictionary
// and data pages are used.
unordered_map<parquet::Encoding::type, int> dict_encoding_stats_;
unordered_map<parquet::Encoding::type, int> data_encoding_stats_;
// Created, owned, and set by the derived class.
DictEncoderBase* dict_encoder_base_;
// Rle encoder object for storing definition levels, owned by instances of this class.
// For non-nested schemas, this always uses 1 bit per row. This is reused across pages
// since the underlying buffer is copied out when the page is finalized.
RleEncoder* def_levels_;
// Data for buffered values. This is owned by instances of this class and gets reused
// across pages.
uint8_t* values_buffer_;
// The size of values_buffer_.
int values_buffer_len_;
// Pointers to statistics, created, owned, and set by the derived class.
ColumnStatsBase* page_stats_base_;
ColumnStatsBase* row_group_stats_base_;
// OffsetIndex stores the locations of the pages.
parquet::OffsetIndex offset_index_;
// ColumnIndex stores the statistics of the pages.
parquet::ColumnIndex column_index_;
// Pointer to the HdfsTableSink's MemTracker.
MemTracker* table_sink_mem_tracker_;
// Memory consumption of the min/max values in the page index.
int64_t page_index_memory_consumption_ = 0;
// Only write ColumnIndex when 'valid_column_index_' is true. We always need to write
// the OffsetIndex though.
bool valid_column_index_ = true;
};
// Per type column writer.
template<typename T>
class HdfsParquetTableWriter::ColumnWriter :
public HdfsParquetTableWriter::BaseColumnWriter {
public:
ColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
const THdfsCompression::type& codec)
: BaseColumnWriter(parent, eval, codec),
num_values_since_dict_size_check_(0),
plain_encoded_value_size_(
ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN);
}
virtual void Reset() {
BaseColumnWriter::Reset();
// Default to dictionary encoding. If the cardinality ends up being too high,
// it will fall back to plain.
current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
dict_encoder_.reset(
new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_,
parent_->parent_->mem_tracker()));
dict_encoder_base_ = dict_encoder_.get();
page_stats_.reset(
new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
page_stats_base_ = page_stats_.get();
row_group_stats_.reset(
new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
row_group_stats_base_ = row_group_stats_.get();
}
protected:
virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
if (UNLIKELY(num_values_since_dict_size_check_ >=
DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
num_values_since_dict_size_check_ = 0;
if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_) return false;
}
++num_values_since_dict_size_check_;
*bytes_needed = dict_encoder_->Put(*CastValue(value));
// If the dictionary contains the maximum number of values, switch to plain
// encoding for the next page. The current page is full and must be written out.
if (UNLIKELY(*bytes_needed < 0)) {
next_page_encoding_ = parquet::Encoding::PLAIN;
return false;
}
parent_->file_size_estimate_ += *bytes_needed;
} else if (current_encoding_ == parquet::Encoding::PLAIN) {
T* v = CastValue(value);
*bytes_needed = plain_encoded_value_size_ < 0 ?
ParquetPlainEncoder::ByteSize<T>(*v) :
plain_encoded_value_size_;
if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
return false;
}
uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
int64_t written_len =
ParquetPlainEncoder::Encode(*v, plain_encoded_value_size_, dst_ptr);
DCHECK_EQ(*bytes_needed, written_len);
current_page_->header.uncompressed_page_size += written_len;
} else {
// TODO: support other encodings here
DCHECK(false);
}
page_stats_->Update(*CastValue(value));
return true;
}
private:
// The period, in # of rows, to check the estimated dictionary page size against
// the data page size. We want to start a new data page when the estimated size
// is at least that big. The estimated size computation is not very cheap and
// we can tolerate going over the data page size by some amount.
// The expected byte size per dictionary value is < 1B and at most 2 bytes so the
// error is pretty low.
// TODO: is there a better way?
static const int DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD = 100;
// Encoder for dictionary encoding for different columns. Only one is set.
scoped_ptr<DictEncoder<T>> dict_encoder_;
// The number of values added since we last checked the dictionary.
int num_values_since_dict_size_check_;
// Size of each encoded value in plain encoding. -1 if the type is variable-length.
int64_t plain_encoded_value_size_;
// Temporary string value to hold CHAR(N)
StringValue temp_;
// Tracks statistics per page. These are written out to the page index.
scoped_ptr<ColumnStats<T>> page_stats_;
// Tracks statistics per row group. This gets reset when starting a new row group.
scoped_ptr<ColumnStats<T>> row_group_stats_;
// Converts a slot pointer to a raw value suitable for encoding
inline T* CastValue(void* value) {
return reinterpret_cast<T*>(value);
}
};
template<>
inline StringValue* HdfsParquetTableWriter::ColumnWriter<StringValue>::CastValue(
void* value) {
if (type().type == TYPE_CHAR) {
temp_.ptr = reinterpret_cast<char*>(value);
temp_.len = StringValue::UnpaddedCharLength(temp_.ptr, type().len);
return &temp_;
}
return reinterpret_cast<StringValue*>(value);
}
// Bools are encoded a bit differently so subclass it explicitly.
class HdfsParquetTableWriter::BoolColumnWriter :
public HdfsParquetTableWriter::BaseColumnWriter {
public:
BoolColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
const THdfsCompression::type& codec)
: BaseColumnWriter(parent, eval, codec),
page_stats_(parent_->reusable_col_mem_pool_.get(), -1),
row_group_stats_(parent_->reusable_col_mem_pool_.get(), -1) {
DCHECK_EQ(eval->root().type().type, TYPE_BOOLEAN);
bool_values_ = parent_->state_->obj_pool()->Add(
new BitWriter(values_buffer_, values_buffer_len_));
// Dictionary encoding doesn't make sense for bools and is not allowed by
// the format.
current_encoding_ = parquet::Encoding::PLAIN;
dict_encoder_base_ = nullptr;
page_stats_base_ = &page_stats_;
row_group_stats_base_ = &row_group_stats_;
}
protected:
virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
bool v = *reinterpret_cast<bool*>(value);
if (!bool_values_->PutValue(v, 1)) return false;
page_stats_.Update(v);
return true;
}
virtual Status FinalizeCurrentPage() {
DCHECK(current_page_ != nullptr);
if (current_page_->finalized) return Status::OK();
bool_values_->Flush();
int num_bytes = bool_values_->bytes_written();
current_page_->header.uncompressed_page_size += num_bytes;
// Call into superclass to handle the rest.
RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage());
bool_values_->Clear();
return Status::OK();
}
private:
// Used to encode bools as single bit values. This is reused across pages.
BitWriter* bool_values_;
// Tracks statistics per page. These are written out to the page index.
ColumnStats<bool> page_stats_;
// Tracks statistics per row group. This gets reset when starting a new file.
ColumnStats<bool> row_group_stats_;
};
}
inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) {
++num_values_;
void* value = expr_eval_->GetValue(row);
if (current_page_ == nullptr) NewPage();
// Ensure that we have enough space for the definition level, but don't write it yet in
// case we don't have enough space for the value.
if (def_levels_->buffer_full()) {
RETURN_IF_ERROR(FinalizeCurrentPage());
NewPage();
}
// Encoding may fail for several reasons - because the current page is not big enough,
// because we've encoded the maximum number of unique dictionary values and need to
// switch to plain encoding, etc. so we may need to try again more than once.
// TODO: Have a clearer set of state transitions here, to make it easier to see that
// this won't loop forever.
while (true) {
// Nulls don't get encoded. Increment the null count of the parquet statistics.
if (value == nullptr) {
DCHECK(page_stats_base_ != nullptr);
page_stats_base_->IncrementNullCount(1);
break;
}
int64_t bytes_needed = 0;
if (ProcessValue(value, &bytes_needed)) {
++current_page_->num_non_null;
break; // Succesfully appended, don't need to retry.
}
// Value didn't fit on page, try again on a new page.
RETURN_IF_ERROR(FinalizeCurrentPage());
// Check how much space is needed to write this value. If that is larger than the
// page size then increase page size and try again.
if (UNLIKELY(bytes_needed > page_size_)) {
if (bytes_needed > MAX_DATA_PAGE_SIZE) {
stringstream ss;
ss << "Cannot write value of size "
<< PrettyPrinter::Print(bytes_needed, TUnit::BYTES) << " bytes to a Parquet "
<< "data page that exceeds the max page limit "
<< PrettyPrinter::Print(MAX_DATA_PAGE_SIZE , TUnit::BYTES) << ".";
return Status(ss.str());
}
page_size_ = bytes_needed;
values_buffer_len_ = page_size_;
values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
}
NewPage();
}
// Now that the value has been successfully written, write the definition level.
bool ret = def_levels_->Put(value != nullptr);
// Writing the def level will succeed because we ensured there was enough space for it
// above, and new pages will always have space for at least a single def level.
DCHECK(ret);
++current_page_->header.data_page_header.num_values;
return Status::OK();
}
inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() {
DCHECK(dict_encoder_base_ != nullptr);
DCHECK_EQ(current_page_->header.uncompressed_page_size, 0);
if (current_page_->num_non_null == 0) return;
int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
while (UNLIKELY(len < 0)) {
// len < 0 indicates the data doesn't fit into a data page. Allocate a larger data
// page.
values_buffer_len_ *= 2;
values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
}
dict_encoder_base_->ClearIndices();
current_page_->header.uncompressed_page_size = len;
}
Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
int64_t* first_data_page, int64_t* first_dictionary_page) {
if (current_page_ == nullptr) {
// This column/file is empty
*first_data_page = *file_pos;
*first_dictionary_page = -1;
return Status::OK();
}
RETURN_IF_ERROR(FinalizeCurrentPage());
*first_dictionary_page = -1;
// First write the dictionary page before any of the data pages.
if (dict_encoder_base_ != nullptr) {
*first_dictionary_page = *file_pos;
// Write dictionary page header
parquet::DictionaryPageHeader dict_header;
dict_header.num_values = dict_encoder_base_->num_entries();
dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY;
++dict_encoding_stats_[dict_header.encoding];
parquet::PageHeader header;
header.type = parquet::PageType::DICTIONARY_PAGE;
header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size();
header.__set_dictionary_page_header(dict_header);
// Write the dictionary page data, compressing it if necessary.
uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate(
header.uncompressed_page_size);
dict_encoder_base_->WriteDict(dict_buffer);
if (compressor_.get() != nullptr) {
SCOPED_TIMER(parent_->parent_->compress_timer());
int64_t max_compressed_size =
compressor_->MaxOutputLen(header.uncompressed_page_size);
DCHECK_GT(max_compressed_size, 0);
uint8_t* compressed_data =
parent_->per_file_mem_pool_->Allocate(max_compressed_size);
header.compressed_page_size = max_compressed_size;
RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
dict_buffer, &header.compressed_page_size, &compressed_data));
dict_buffer = compressed_data;
// We allocated the output based on the guessed size, return the extra allocated
// bytes back to the mem pool.
parent_->per_file_mem_pool_->ReturnPartialAllocation(
max_compressed_size - header.compressed_page_size);
} else {
header.compressed_page_size = header.uncompressed_page_size;
}
uint8_t* header_buffer;
uint32_t header_len;
RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
&header, &header_len, &header_buffer));
RETURN_IF_ERROR(parent_->Write(header_buffer, header_len));
*file_pos += header_len;
total_compressed_byte_size_ += header_len;
total_uncompressed_byte_size_ += header_len;
RETURN_IF_ERROR(parent_->Write(dict_buffer, header.compressed_page_size));
*file_pos += header.compressed_page_size;
total_compressed_byte_size_ += header.compressed_page_size;
total_uncompressed_byte_size_ += header.uncompressed_page_size;
}
*first_data_page = *file_pos;
int64_t current_row_group_index = 0;
offset_index_.page_locations.resize(num_data_pages_);
// Write data pages
for (int i = 0; i < num_data_pages_; ++i) {
DataPage& page = pages_[i];
parquet::PageLocation location;
if (page.header.data_page_header.num_values == 0) {
// Skip empty pages
location.offset = -1;
location.compressed_page_size = 0;
location.first_row_index = -1;
offset_index_.page_locations[i] = location;
continue;
}
location.offset = *file_pos;
location.first_row_index = current_row_group_index;
// Write data page header
uint8_t* buffer = nullptr;
uint32_t len = 0;
RETURN_IF_ERROR(
parent_->thrift_serializer_->Serialize(&page.header, &len, &buffer));
RETURN_IF_ERROR(parent_->Write(buffer, len));
*file_pos += len;
// Note that the namings are confusing here:
// parquet::PageHeader::compressed_page_size is the compressed page size in bytes, as
// its name suggests. On the other hand, parquet::PageLocation::compressed_page_size
// also includes the size of the page header.
location.compressed_page_size = page.header.compressed_page_size + len;
offset_index_.page_locations[i] = location;
// Write the page data
RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
*file_pos += page.header.compressed_page_size;
current_row_group_index += page.header.data_page_header.num_values;
}
return Status::OK();
}
Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
DCHECK(current_page_ != nullptr);
if (current_page_->finalized) return Status::OK();
// If the entire page was NULL, encode it as PLAIN since there is no
// data anyway. We don't output a useless dictionary page and it works
// around a parquet MR bug (see IMPALA-759 for more details).
if (current_page_->num_non_null == 0) current_encoding_ = parquet::Encoding::PLAIN;
if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) WriteDictDataPage();
parquet::PageHeader& header = current_page_->header;
header.data_page_header.encoding = current_encoding_;
// Accumulate encoding statistics
column_encodings_.insert(header.data_page_header.encoding);
++data_encoding_stats_[header.data_page_header.encoding];
// Compute size of definition bits
def_levels_->Flush();
current_page_->num_def_bytes = sizeof(int32_t) + def_levels_->len();
header.uncompressed_page_size += current_page_->num_def_bytes;
// At this point we know all the data for the data page. Combine them into one buffer.
uint8_t* uncompressed_data = nullptr;
if (compressor_.get() == nullptr) {
uncompressed_data =
parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size);
} else {
// We have compression. Combine into the staging buffer.
parent_->compression_staging_buffer_.resize(
header.uncompressed_page_size);
uncompressed_data = &parent_->compression_staging_buffer_[0];
}
BufferBuilder buffer(uncompressed_data, header.uncompressed_page_size);
// Copy the definition (null) data
int num_def_level_bytes = def_levels_->len();
buffer.Append(num_def_level_bytes);
buffer.Append(def_levels_->buffer(), num_def_level_bytes);
// TODO: copy repetition data when we support nested types.
buffer.Append(values_buffer_, buffer.capacity() - buffer.size());
// Apply compression if necessary
if (compressor_.get() == nullptr) {
current_page_->data = uncompressed_data;
header.compressed_page_size = header.uncompressed_page_size;
} else {
SCOPED_TIMER(parent_->parent_->compress_timer());
int64_t max_compressed_size =
compressor_->MaxOutputLen(header.uncompressed_page_size);
DCHECK_GT(max_compressed_size, 0);
uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
header.compressed_page_size = max_compressed_size;
RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
uncompressed_data, &header.compressed_page_size, &compressed_data));
current_page_->data = compressed_data;
// We allocated the output based on the guessed size, return the extra allocated
// bytes back to the mem pool.
parent_->per_file_mem_pool_->ReturnPartialAllocation(
max_compressed_size - header.compressed_page_size);
}
DCHECK(page_stats_base_ != nullptr);
parquet::Statistics page_stats;
page_stats_base_->EncodeToThrift(&page_stats);
{
// If pages_stats contains min_value and max_value, then append them to min_values_
// and max_values_ and also mark the page as not null. In case min and max values are
// not set, push empty strings to maintain the consistency of the index and mark the
// page as null. Always push the null_count.
string min_val;
string max_val;
if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
&min_val);
Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
&max_val);
if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
column_index_.null_pages.push_back(false);
} else {
DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
column_index_.null_pages.push_back(true);
DCHECK_EQ(page_stats.null_count, num_values_);
}
int64_t new_memory_allocation = min_val.capacity() + max_val.capacity();
if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
"Failed to allocate memory for Parquet page index.", new_memory_allocation);
}
page_index_memory_consumption_ += new_memory_allocation;
column_index_.min_values.emplace_back(std::move(min_val));
column_index_.max_values.emplace_back(std::move(max_val));
column_index_.null_counts.push_back(page_stats.null_count);
}
// Update row group statistics from page statistics.
DCHECK(row_group_stats_base_ != nullptr);
row_group_stats_base_->Merge(*page_stats_base_);
// Add the size of the data page header
uint8_t* header_buffer;
uint32_t header_len = 0;
RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
&current_page_->header, &header_len, &header_buffer));
current_page_->finalized = true;
total_compressed_byte_size_ += header_len + header.compressed_page_size;
total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
parent_->file_size_estimate_ += header_len + header.compressed_page_size;
def_levels_->Clear();
return Status::OK();
}
void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
if (num_data_pages_ < pages_.size()) {
// Reuse an existing page
current_page_ = &pages_[num_data_pages_++];
current_page_->header.data_page_header.num_values = 0;
current_page_->header.compressed_page_size = 0;
current_page_->header.uncompressed_page_size = 0;
} else {
pages_.push_back(DataPage());
current_page_ = &pages_[num_data_pages_++];
parquet::DataPageHeader header;
header.num_values = 0;
// The code that populates the column chunk metadata's encodings field
// relies on these specific values for the definition/repetition level
// encodings.
header.definition_level_encoding = parquet::Encoding::RLE;
header.repetition_level_encoding = parquet::Encoding::RLE;
current_page_->header.__set_data_page_header(header);
}
current_encoding_ = next_page_encoding_;
current_page_->finalized = false;
current_page_->num_non_null = 0;
page_stats_base_->Reset();
}
HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state,
OutputPartition* output, const HdfsPartitionDescriptor* part_desc,
const HdfsTableDescriptor* table_desc)
: HdfsTableWriter(parent, state, output, part_desc, table_desc),
thrift_serializer_(new ThriftSerializer(true)),
current_row_group_(nullptr),
row_count_(0),
file_size_limit_(0),
reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
row_idx_(0) {}
HdfsParquetTableWriter::~HdfsParquetTableWriter() {
}
Status HdfsParquetTableWriter::Init() {
// Initialize file metadata
file_metadata_.version = PARQUET_CURRENT_VERSION;
stringstream created_by;
created_by << "impala version " << GetDaemonBuildVersion()
<< " (build " << GetDaemonBuildHash() << ")";
file_metadata_.__set_created_by(created_by.str());
// Default to snappy compressed
THdfsCompression::type codec = THdfsCompression::SNAPPY;
const TQueryOptions& query_options = state_->query_options();
if (query_options.__isset.compression_codec) {
codec = query_options.compression_codec;
}
if (!(codec == THdfsCompression::NONE ||
codec == THdfsCompression::GZIP ||
codec == THdfsCompression::SNAPPY)) {
stringstream ss;
ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
return Status(ss.str());
}
VLOG_FILE << "Using compression codec: " << codec;
int num_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols();
// When opening files using the hdfsOpenFile() API, the maximum block size is limited to
// 2GB.
int64_t min_block_size = MinBlockSize(num_cols);
if (min_block_size >= numeric_limits<int32_t>::max()) {
stringstream ss;
return Status(Substitute("Minimum required block size must be less than 2GB "
"(currently $0), try reducing the number of non-partitioning columns in the "
"target table (currently $1).",
PrettyPrinter::Print(min_block_size, TUnit::BYTES), num_cols));
}
columns_.resize(num_cols);
// Initialize each column structure.
for (int i = 0; i < columns_.size(); ++i) {
BaseColumnWriter* writer = nullptr;
const ColumnType& type = output_expr_evals_[i]->root().type();
switch (type.type) {
case TYPE_BOOLEAN:
writer = new BoolColumnWriter(this, output_expr_evals_[i], codec);
break;
case TYPE_TINYINT:
writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec);
break;
case TYPE_SMALLINT:
writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec);
break;
case TYPE_INT:
writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec);
break;
case TYPE_BIGINT:
writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec);
break;
case TYPE_FLOAT:
writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec);
break;
case TYPE_DOUBLE:
writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec);
break;
case TYPE_TIMESTAMP:
writer = new ColumnWriter<TimestampValue>(
this, output_expr_evals_[i], codec);
break;
case TYPE_VARCHAR:
case TYPE_STRING:
case TYPE_CHAR:
writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], codec);
break;
case TYPE_DECIMAL:
switch (output_expr_evals_[i]->root().type().GetByteSize()) {
case 4:
writer = new ColumnWriter<Decimal4Value>(
this, output_expr_evals_[i], codec);
break;
case 8:
writer = new ColumnWriter<Decimal8Value>(
this, output_expr_evals_[i], codec);
break;
case 16:
writer = new ColumnWriter<Decimal16Value>(
this, output_expr_evals_[i], codec);
break;
default:
DCHECK(false);
}
break;
default:
DCHECK(false);
}
columns_[i].reset(writer);
RETURN_IF_ERROR(columns_[i]->Init());
}
RETURN_IF_ERROR(CreateSchema());
return Status::OK();
}
Status HdfsParquetTableWriter::CreateSchema() {
int num_clustering_cols = table_desc_->num_clustering_cols();
// Create flattened tree with a single root.
file_metadata_.schema.resize(columns_.size() + 1);
file_metadata_.schema[0].__set_num_children(columns_.size());
file_metadata_.schema[0].name = "schema";
for (int i = 0; i < columns_.size(); ++i) {
parquet::SchemaElement& node = file_metadata_.schema[i + 1];
const ColumnType& type = output_expr_evals_[i]->root().type();
node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
node.__set_type(ConvertInternalToParquetType(type.type));
node.__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
if (type.type == TYPE_DECIMAL) {
// This column is type decimal. Update the file metadata to include the
// additional fields:
// 1) converted_type: indicate this is really a decimal column.
// 2) type_length: the number of bytes used per decimal value in the data
// 3) precision/scale
node.__set_converted_type(parquet::ConvertedType::DECIMAL);
node.__set_type_length(
ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type()));
node.__set_scale(output_expr_evals_[i]->root().type().scale);
node.__set_precision(output_expr_evals_[i]->root().type().precision);
} else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR ||
(type.type == TYPE_STRING &&
state_->query_options().parquet_annotate_strings_utf8)) {
node.__set_converted_type(parquet::ConvertedType::UTF8);
} else if (type.type == TYPE_TINYINT) {
node.__set_converted_type(parquet::ConvertedType::INT_8);
} else if (type.type == TYPE_SMALLINT) {
node.__set_converted_type(parquet::ConvertedType::INT_16);
} else if (type.type == TYPE_INT) {
node.__set_converted_type(parquet::ConvertedType::INT_32);
} else if (type.type == TYPE_BIGINT) {
node.__set_converted_type(parquet::ConvertedType::INT_64);
}
}
return Status::OK();
}
Status HdfsParquetTableWriter::AddRowGroup() {
if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup());
file_metadata_.row_groups.push_back(parquet::RowGroup());
current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
// Initialize new row group metadata.
int num_clustering_cols = table_desc_->num_clustering_cols();
current_row_group_->columns.resize(columns_.size());
for (int i = 0; i < columns_.size(); ++i) {
parquet::ColumnMetaData metadata;
metadata.type = ConvertInternalToParquetType(columns_[i]->type().type);
metadata.path_in_schema.push_back(
table_desc_->col_descs()[i + num_clustering_cols].name());
metadata.codec = columns_[i]->GetParquetCodec();
current_row_group_->columns[i].__set_meta_data(metadata);
}
return Status::OK();
}
int64_t HdfsParquetTableWriter::MinBlockSize(int64_t num_file_cols) const {
// See file_size_limit_ calculation in InitNewFile().
return 3 * DEFAULT_DATA_PAGE_SIZE * num_file_cols;
}
uint64_t HdfsParquetTableWriter::default_block_size() const {
int64_t block_size;
if (state_->query_options().__isset.parquet_file_size &&
state_->query_options().parquet_file_size > 0) {
// If the user specified a value explicitly, use it. InitNewFile() will verify that
// the actual file's block size is sufficient.
block_size = state_->query_options().parquet_file_size;
} else {
block_size = HDFS_BLOCK_SIZE;
// Blocks are usually HDFS_BLOCK_SIZE bytes, unless there are many columns, in
// which case a per-column minimum kicks in.
block_size = max(block_size, MinBlockSize(columns_.size()));
}
// HDFS does not like block sizes that are not aligned
return BitUtil::RoundUp(block_size, HDFS_BLOCK_ALIGNMENT);
}
Status HdfsParquetTableWriter::InitNewFile() {
DCHECK(current_row_group_ == nullptr);
per_file_mem_pool_->Clear();
// Get the file limit
file_size_limit_ = output_->block_size;
if (file_size_limit_ < HDFS_MIN_FILE_SIZE) {
stringstream ss;
ss << "Hdfs file size (" << file_size_limit_ << ") is too small.";
return Status(ss.str());
}
// We want to output HDFS files that are no more than file_size_limit_. If we
// go over the limit, HDFS will split the file into multiple blocks which
// is undesirable. If we are under the limit, we potentially end up with more
// files than necessary. Either way, it is not going to generate a invalid
// file.
// With arbitrary encoding schemes, it is not possible to know if appending
// a new row will push us over the limit until after encoding it. Rolling back
// a row can be tricky as well so instead we will stop the file when it is
// 2 * DEFAULT_DATA_PAGE_SIZE * num_cols short of the limit. e.g. 50 cols with 8K data
// pages, means we stop 800KB shy of the limit.
// Data pages calculate their size precisely when they are complete so having
// a two page buffer guarantees we will never go over (unless there are huge values
// that require increasing the page size).
// TODO: this should be made dynamic based on the size of rows seen so far.
// This would for example, let us account for very long string columns.
const int64_t num_cols = columns_.size();
if (file_size_limit_ < MinBlockSize(num_cols)) {
stringstream ss;
ss << "Parquet file size " << file_size_limit_ << " bytes is too small for "
<< "a table with " << num_cols << " non-partitioning columns. Set query option "
<< "PARQUET_FILE_SIZE to at least " << MinBlockSize(num_cols) << ".";
return Status(ss.str());
}
file_size_limit_ -= 2 * DEFAULT_DATA_PAGE_SIZE * columns_.size();
DCHECK_GE(file_size_limit_,
static_cast<int64_t>(DEFAULT_DATA_PAGE_SIZE * columns_.size()));
file_pos_ = 0;
row_count_ = 0;
file_size_estimate_ = 0;
file_metadata_.row_groups.clear();
RETURN_IF_ERROR(AddRowGroup());
RETURN_IF_ERROR(WriteFileHeader());
return Status::OK();
}
Status HdfsParquetTableWriter::AppendRows(
RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
SCOPED_TIMER(parent_->encode_timer());
*new_file = false;
int limit;
if (row_group_indices.empty()) {
limit = batch->num_rows();
} else {
limit = row_group_indices.size();
}
bool all_rows = row_group_indices.empty();
for (; row_idx_ < limit;) {
TupleRow* current_row = all_rows ?
batch->GetRow(row_idx_) : batch->GetRow(row_group_indices[row_idx_]);
for (int j = 0; j < columns_.size(); ++j) {
RETURN_IF_ERROR(columns_[j]->AppendRow(current_row));
}
++row_idx_;
++row_count_;
++output_->num_rows;
if (file_size_estimate_ > file_size_limit_) {
// This file is full. We need a new file.
*new_file = true;
return Status::OK();
}
}
// We exhausted the batch, so we materialize the statistics before releasing the memory.
for (unique_ptr<BaseColumnWriter>& column : columns_) {
RETURN_IF_ERROR(column->MaterializeStatsValues());
}
// Reset the row_idx_ when we exhaust the batch. We can exit before exhausting
// the batch if we run out of file space and will continue from the last index.
row_idx_ = 0;
return Status::OK();
}
Status HdfsParquetTableWriter::Finalize() {
SCOPED_TIMER(parent_->hdfs_write_timer());
// At this point we write out the rest of the file. We first update the file
// metadata, now that all the values have been seen.
file_metadata_.num_rows = row_count_;
// Set the ordering used to write parquet statistics for columns in the file.
parquet::ColumnOrder col_order = parquet::ColumnOrder();
col_order.__set_TYPE_ORDER(parquet::TypeDefinedOrder());
file_metadata_.column_orders.assign(columns_.size(), col_order);
file_metadata_.__isset.column_orders = true;
RETURN_IF_ERROR(FlushCurrentRowGroup());
RETURN_IF_ERROR(WritePageIndex());
RETURN_IF_ERROR(WriteFileFooter());
stats_.__set_parquet_stats(parquet_insert_stats_);
COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
return Status::OK();
}
void HdfsParquetTableWriter::Close() {
// Release all accumulated memory
for (int i = 0; i < columns_.size(); ++i) {
columns_[i]->Close();
}
reusable_col_mem_pool_->FreeAll();
per_file_mem_pool_->FreeAll();
compression_staging_buffer_.clear();
}
Status HdfsParquetTableWriter::WriteFileHeader() {
DCHECK_EQ(file_pos_, 0);
RETURN_IF_ERROR(Write(PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)));
file_pos_ += sizeof(PARQUET_VERSION_NUMBER);
file_size_estimate_ += sizeof(PARQUET_VERSION_NUMBER);
return Status::OK();
}
Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
if (current_row_group_ == nullptr) return Status::OK();
int num_clustering_cols = table_desc_->num_clustering_cols();
for (int i = 0; i < columns_.size(); ++i) {
int64_t data_page_offset, dict_page_offset;
// Flush this column. This updates the final metadata sizes for this column.
RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, &dict_page_offset));
DCHECK_GT(data_page_offset, 0);
parquet::ColumnChunk& col_chunk = current_row_group_->columns[i];
parquet::ColumnMetaData& col_metadata = col_chunk.meta_data;
col_metadata.data_page_offset = data_page_offset;
if (dict_page_offset >= 0) {
col_metadata.__set_dictionary_page_offset(dict_page_offset);
}
BaseColumnWriter* col_writer = columns_[i].get();
col_metadata.num_values = col_writer->num_values();
col_metadata.total_uncompressed_size = col_writer->total_uncompressed_size();
col_metadata.total_compressed_size = col_writer->total_compressed_size();
current_row_group_->total_byte_size += col_writer->total_compressed_size();
current_row_group_->num_rows = col_writer->num_values();
current_row_group_->columns[i].file_offset = file_pos_;
const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
parquet_insert_stats_.per_column_size[col_name] +=
col_writer->total_compressed_size();
// Write encodings and encoding stats for this column
col_metadata.encodings.clear();
for (parquet::Encoding::type encoding : col_writer->column_encodings_) {
col_metadata.encodings.push_back(encoding);
}
vector<parquet::PageEncodingStats> encoding_stats;
// Add dictionary page encoding stats
for (const auto& entry: col_writer->dict_encoding_stats_) {
parquet::PageEncodingStats dict_enc_stat;
dict_enc_stat.page_type = parquet::PageType::DICTIONARY_PAGE;
dict_enc_stat.encoding = entry.first;
dict_enc_stat.count = entry.second;
encoding_stats.push_back(dict_enc_stat);
}
// Add data page encoding stats
for (const auto& entry: col_writer->data_encoding_stats_) {
parquet::PageEncodingStats data_enc_stat;
data_enc_stat.page_type = parquet::PageType::DATA_PAGE;
data_enc_stat.encoding = entry.first;
data_enc_stat.count = entry.second;
encoding_stats.push_back(data_enc_stat);
}
col_metadata.__set_encoding_stats(encoding_stats);
// Build column statistics and add them to the header.
col_writer->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
// Since we don't supported complex schemas, all columns should have the same
// number of values.
DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
col_writer->num_values());
// Metadata for this column is complete, write it out to file. The column metadata
// goes at the end so that when we have collocated files, the column data can be
// written without buffering.
uint8_t* buffer = nullptr;
uint32_t len = 0;
RETURN_IF_ERROR(
thrift_serializer_->Serialize(&current_row_group_->columns[i], &len, &buffer));
RETURN_IF_ERROR(Write(buffer, len));
file_pos_ += len;
}
// Populate RowGroup::sorting_columns with all columns specified by the Frontend.
for (int col_idx : parent_->sort_columns()) {
current_row_group_->sorting_columns.push_back(parquet::SortingColumn());
parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back();
sorting_column.column_idx = col_idx;
sorting_column.descending = false;
sorting_column.nulls_first = false;
}
current_row_group_->__isset.sorting_columns =
!current_row_group_->sorting_columns.empty();
current_row_group_ = nullptr;
return Status::OK();
}
Status HdfsParquetTableWriter::WritePageIndex() {
// Currently Impala only write Parquet files with a single row group. The current
// page index logic depends on this behavior as it only keeps one row group's
// statistics in memory.
DCHECK_EQ(file_metadata_.row_groups.size(), 1);
parquet::RowGroup* row_group = &(file_metadata_.row_groups[0]);
// Write out the column indexes.
for (int i = 0; i < columns_.size(); ++i) {
auto& column = *columns_[i];
if (!column.valid_column_index_) continue;
column.column_index_.__set_boundary_order(
column.row_group_stats_base_->GetBoundaryOrder());
// We always set null_counts.
column.column_index_.__isset.null_counts = true;
uint8_t* buffer = nullptr;
uint32_t len = 0;
RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.column_index_, &len, &buffer));
RETURN_IF_ERROR(Write(buffer, len));
// Update the column_index_offset and column_index_length of the ColumnChunk
row_group->columns[i].__set_column_index_offset(file_pos_);
row_group->columns[i].__set_column_index_length(len);
file_pos_ += len;
}
// Write out the offset indexes.
for (int i = 0; i < columns_.size(); ++i) {
auto& column = *columns_[i];
uint8_t* buffer = nullptr;
uint32_t len = 0;
RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.offset_index_, &len, &buffer));
RETURN_IF_ERROR(Write(buffer, len));
// Update the offset_index_offset and offset_index_length of the ColumnChunk
row_group->columns[i].__set_offset_index_offset(file_pos_);
row_group->columns[i].__set_offset_index_length(len);
file_pos_ += len;
}
// Reset column writers.
for (auto& column : columns_) column->Reset();
return Status::OK();
}
Status HdfsParquetTableWriter::WriteFileFooter() {
// Write file_meta_data
uint32_t file_metadata_len = 0;
uint8_t* buffer = nullptr;
RETURN_IF_ERROR(
thrift_serializer_->Serialize(&file_metadata_, &file_metadata_len, &buffer));
RETURN_IF_ERROR(Write(buffer, file_metadata_len));
// Write footer
RETURN_IF_ERROR(Write<uint32_t>(file_metadata_len));
RETURN_IF_ERROR(Write(PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)));
return Status::OK();
}