blob: 1aeebfd050cf1f9762bb230457113fd2651a080c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/parquet/parquet-column-chunk-reader.h"
#include <string>
#include "exec/parquet/parquet-level-decoder.h"
#include "runtime/mem-pool.h"
#include "runtime/runtime-state.h"
#include "runtime/scoped-buffer.h"
#include "util/codec.h"
#include "common/names.h"
using namespace impala::io;
using parquet::Encoding;
namespace impala {
const string PARQUET_PAGE_MEM_LIMIT_EXCEEDED =
"ParquetColumnChunkReader::$0() failed to allocate $1 bytes for $2.";
// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
// if this matches those versions and compatibility workarounds need to be used.
static bool RequiresSkippedDictionaryHeaderCheck(
const ParquetFileVersion& v) {
if (v.application != "impala") return false;
return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
}
ParquetColumnChunkReader::ParquetColumnChunkReader(HdfsParquetScanner* parent,
string schema_name, int slot_id, ValueMemoryType value_mem_type, bool has_rep_level,
bool has_def_level)
: parent_(parent),
schema_name_(move(schema_name)),
page_reader_(parent, schema_name_),
slot_id_(slot_id),
data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())),
value_mem_type_(value_mem_type),
has_rep_level_(has_rep_level),
has_def_level_(has_def_level) {}
ParquetColumnChunkReader::~ParquetColumnChunkReader() {}
Status ParquetColumnChunkReader::InitColumnChunk(const HdfsFileDesc& file_desc,
const parquet::ColumnChunk& col_chunk, int row_group_idx,
std::vector<io::ScanRange::SubRange>&& sub_ranges) {
if (col_chunk.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr, false,
ConvertParquetToImpalaCodec(col_chunk.meta_data.codec), &decompressor_));
}
RETURN_IF_ERROR(page_reader_.InitColumnChunk(file_desc, col_chunk,
row_group_idx, move(sub_ranges)));
return Status::OK();
}
void ParquetColumnChunkReader::Close(MemPool* mem_pool) {
if (mem_pool != nullptr && value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
mem_pool->AcquireData(data_page_pool_.get(), false);
} else {
data_page_pool_->FreeAll();
}
if (decompressor_ != nullptr) decompressor_->Close();
}
void ParquetColumnChunkReader::ReleaseResourcesOfLastPage(MemPool& mem_pool) {
if (value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
mem_pool.AcquireData(data_page_pool_.get(), false);
} else {
data_page_pool_->FreeAll();
}
// We don't hold any pointers to earlier pages in the stream - we can safely free
// any I/O or boundary buffer.
// TODO: is this really needed? The read in ReadPageHeader() will release these
// resources anyway.
stream()->ReleaseCompletedResources(false);
}
Status ParquetColumnChunkReader::StartScan() {
DCHECK_GT(io_reservation_, 0);
RETURN_IF_ERROR(page_reader_.StartScan(io_reservation_));
return Status::OK();
}
Status ParquetColumnChunkReader::SkipPageData() {
return page_reader_.SkipPageData();
}
Status ParquetColumnChunkReader::TryReadDictionaryPage(bool* is_dictionary_page,
bool* eos, bool skip_data, ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
int64_t* data_size, int* num_entries) {
RETURN_IF_ERROR(page_reader_.ReadPageHeader(eos));
*is_dictionary_page = CurrentPageHeader().__isset.dictionary_page_header;
if (*eos || !(*is_dictionary_page)) return Status::OK();
if (skip_data) return SkipPageData();
RETURN_IF_ERROR(ReadDictionaryData(uncompressed_buffer, dict_values,
data_size, num_entries));
return Status::OK();
}
Status ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_buffer,
uint8_t** dict_values, int64_t* data_size, int* num_entries) {
const parquet::PageHeader& current_page_header = CurrentPageHeader();
const parquet::DictionaryPageHeader* dict_header = nullptr;
if (current_page_header.__isset.dictionary_page_header) {
dict_header = &current_page_header.dictionary_page_header;
} else {
if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
return Status("Dictionary page does not have dictionary header set.");
}
}
// Check that the dictionary page is PLAIN encoded. PLAIN_DICTIONARY in the context of
// a dictionary page means the same thing.
if (dict_header != nullptr &&
dict_header->encoding != Encoding::PLAIN &&
dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
"for dictionary pages.");
}
*data_size = current_page_header.compressed_page_size;
uint8_t* data = nullptr;
RETURN_IF_ERROR(page_reader_.ReadPageData(&data));
if (current_page_header.uncompressed_page_size == 0) {
// The size of dictionary can be 0, if every value is null.
*data_size = 0;
*num_entries = 0;
*dict_values = nullptr;
return Status::OK();
}
// There are 3 different cases from the aspect of memory management:
// 1. If the column type is string, the dictionary will contain pointers to a buffer,
// so the buffer's lifetime must be as long as any row batch that references it.
// 2. If the column type is not string, and the dictionary page is compressed, then a
// temporary buffer is needed for the uncompressed values.
// 3. If the column type is not string, and the dictionary page is not compressed,
// then no buffer is necessary.
const bool copy_buffer = value_mem_type_ == ValueMemoryType::FIXED_LEN_STR
|| value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
if (decompressor_.get() != nullptr || copy_buffer) {
int buffer_size = current_page_header.uncompressed_page_size;
if (copy_buffer) {
*dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // case 1.
} else if (uncompressed_buffer->TryAllocate(buffer_size)) {
*dict_values = uncompressed_buffer->buffer(); // case 2
}
if (UNLIKELY(*dict_values == nullptr)) {
string details = Substitute(PARQUET_PAGE_MEM_LIMIT_EXCEEDED, "InitDictionary",
buffer_size, "dictionary");
return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
parent_->state_, details, buffer_size);
}
} else {
*dict_values = data; // case 3.
}
if (decompressor_.get() != nullptr) {
int uncompressed_size = current_page_header.uncompressed_page_size;
RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, *data_size, data,
&uncompressed_size, dict_values));
VLOG_FILE << "Decompressed " << *data_size << " to " << uncompressed_size;
if (current_page_header.uncompressed_page_size != uncompressed_size) {
return Status(Substitute("Error decompressing dictionary page in file '$0'. "
"Expected $1 uncompressed bytes but got $2", filename(),
current_page_header.uncompressed_page_size, uncompressed_size));
}
*data_size = uncompressed_size;
} else {
if (current_page_header.uncompressed_page_size != *data_size) {
return Status(Substitute("Error reading dictionary page in file '$0'. "
"Expected $1 bytes but got $2", filename(),
current_page_header.uncompressed_page_size, *data_size));
}
if (copy_buffer) memcpy(*dict_values, data, *data_size);
}
*num_entries = dict_header->num_values;
return Status::OK();
}
Status ParquetColumnChunkReader::ReadNextDataPageHeader(int* num_values) {
// Read the next data page header, skipping page types we don't care about and empty
// data pages. This method should be called after we know that the first page is not a
// dictionary page. Therefore, if we find a dictionary page, it is an error in
// the parquet file and we return a non-ok status (returned by
// page_reader_.ReadPageHeader()).
bool eos = false;
*num_values = 0;
bool next_data_page_found = false;
while (!next_data_page_found) {
RETURN_IF_ERROR(page_reader_.ReadPageHeader(&eos));
const parquet::PageHeader& header = CurrentPageHeader();
// If page_reader_.ReadPageHeader() > 0 and the page is a dictionary page then
// ReadPageHeader would have returned an error.
DCHECK(page_reader_.PageHeadersRead() > 0
|| !header.__isset.dictionary_page_header)
<< "Should not call this method on the first page if it is a dictionary.";
if (eos) return Status::OK();
if (header.type== parquet::PageType::DATA_PAGE
|| header.type == parquet::PageType::DATA_PAGE_V2) {
bool is_v2 = header.type == parquet::PageType::DATA_PAGE_V2;
int tmp_num_values = is_v2 ? header.data_page_header_v2.num_values
: header.data_page_header.num_values;
if (tmp_num_values < 0) {
return Status(Substitute("Error reading data page in Parquet file '$0'. "
"Invalid number of values in metadata: $1", filename(), tmp_num_values));
} else if (tmp_num_values == 0) {
// Skip pages with 0 values.
VLOG_FILE << "Found empty page in " << filename();
RETURN_IF_ERROR(SkipPageData());
} else {
*num_values = tmp_num_values;
next_data_page_found = true;
}
} else {
// We can safely skip non-data pages
RETURN_IF_ERROR(SkipPageData());
}
}
return Status::OK();
}
Status ParquetColumnChunkReader::ProcessRepDefLevelsInDataPageV1(
const parquet::DataPageHeader* header_v1, DataPageInfo* page_info,
uint8_t** data, int* data_size) {
page_info->rep_level_encoding = header_v1->repetition_level_encoding;
page_info->def_level_encoding = header_v1->definition_level_encoding;
int32_t rep_level_size = 0;
if (has_rep_level_) {
RETURN_IF_ERROR(ParquetLevelDecoder::ValidateEncoding(
filename(), page_info->rep_level_encoding));
RETURN_IF_ERROR(ParquetLevelDecoder::ParseRleByteSize(
filename(), data, data_size, &rep_level_size));
}
page_info->rep_level_ptr = *data;
page_info->rep_level_size = rep_level_size;
*data += rep_level_size;
*data_size -= rep_level_size;
int32_t def_level_size = 0;
if (has_def_level_) {
RETURN_IF_ERROR(ParquetLevelDecoder::ValidateEncoding(
filename(), page_info->def_level_encoding));
RETURN_IF_ERROR(ParquetLevelDecoder::ParseRleByteSize(
filename(), data, data_size, &def_level_size));
}
page_info->def_level_ptr = *data;
page_info->def_level_size = def_level_size;
*data += def_level_size;
*data_size -= def_level_size;
return Status::OK();
}
Status ParquetColumnChunkReader::ProcessRepDefLevelsInDataPageV2(
const parquet::DataPageHeaderV2* header_v2,
DataPageInfo* page_info, uint8_t* data, int max_size) {
int rep_level_size = header_v2->repetition_levels_byte_length;
int def_level_size = header_v2->definition_levels_byte_length;
if (rep_level_size < 0 || def_level_size < 0
|| rep_level_size + def_level_size > max_size) {
return Status(Substitute("Corrupt rep/def level sizes in v2 data page in file '$0'. "
"rep level size: $1 def level size: $2 max size: $3",
filename(), rep_level_size, def_level_size, max_size));
}
page_info->rep_level_size = rep_level_size;
page_info->def_level_size = def_level_size;
page_info->rep_level_ptr = data;
page_info->def_level_ptr = data + rep_level_size;
// v2 pages always use RLE for rep/def levels.
page_info->rep_level_encoding = parquet::Encoding::RLE;
page_info->def_level_encoding = parquet::Encoding::RLE;
return Status::OK();
}
Status ParquetColumnChunkReader::ReadDataPageData(DataPageInfo* page_info) {
DCHECK(page_info != nullptr);
page_info->is_valid = false;
const parquet::PageHeader& header = CurrentPageHeader();
bool is_v2 = header.type == parquet::PageType::DATA_PAGE_V2;
DCHECK(is_v2 || header.type == parquet::PageType::DATA_PAGE);
// In v2 pages if decompressor_ == nullptr it is still possible that is_compressed
// is true in the header (parquet-mr writes like this if compression=UNCOMPRESSED).
bool is_compressed = decompressor_.get() != nullptr
&& (!is_v2 || header.data_page_header_v2.is_compressed);
int orig_compressed_size = header.compressed_page_size;
int orig_uncompressed_size = header.uncompressed_page_size;
int compressed_size = orig_compressed_size;
int uncompressed_size = orig_uncompressed_size;
// Read compressed data.
uint8_t* compressed_data;
RETURN_IF_ERROR(page_reader_.ReadPageData(&compressed_data));
// If v2 data page, fill rep/def level info based on header. For v1 pages this will be
// done after decompression.
if (is_v2) {
RETURN_IF_ERROR(ProcessRepDefLevelsInDataPageV2(&header.data_page_header_v2,
page_info, compressed_data, orig_uncompressed_size));
// In v2 pages compressed_page_size size also includes the uncompressed
// rep/def levels.
// https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/src/main/thrift/parquet.thrift#L578
int levels_size = page_info->rep_level_size + page_info->def_level_size;
compressed_size -= levels_size;
uncompressed_size -= levels_size;
compressed_data += levels_size;
}
const bool has_slot_desc = value_mem_type_ != ValueMemoryType::NO_SLOT_DESC;
int data_size = uncompressed_size;
uint8_t* data = nullptr;
if (is_compressed) {
SCOPED_TIMER(parent_->decompress_timer_);
uint8_t* decompressed_buffer;
RETURN_IF_ERROR(AllocateUncompressedDataPage(
uncompressed_size, "decompressed data", &decompressed_buffer));
int actual_uncompressed_size = uncompressed_size;
RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
compressed_size, compressed_data, &actual_uncompressed_size,
&decompressed_buffer));
// TODO: can't we call stream_->ReleaseCompletedResources(false); at this point?
// (we can't in v2 data page as the original buffer contains rep/def levels)
VLOG_FILE << "Decompressed " << compressed_size
<< " to " << actual_uncompressed_size;
if (uncompressed_size != actual_uncompressed_size) {
return Status(Substitute("Error decompressing data page in file '$0'. "
"Expected $1 uncompressed bytes but got $2", filename(),
uncompressed_size, actual_uncompressed_size));
}
data = decompressed_buffer;
if (has_slot_desc) {
// Use original sizes (includes levels in v2) in the profile.
parent_->scan_node_->UpdateBytesRead(
slot_id_, orig_uncompressed_size, orig_compressed_size);
parent_->UpdateUncompressedPageSizeCounter(orig_uncompressed_size);
parent_->UpdateCompressedPageSizeCounter(orig_compressed_size);
}
} else {
if (compressed_size != uncompressed_size) {
return Status(Substitute("Error reading data page in file '$0'. "
"Compressed size ($1) should be the same as uncompressed size ($2) "
"in pages without compression.", filename(),
compressed_size, uncompressed_size));
}
// TODO: could skip copying when the data page is dict encoded as strings
// will point to the dictionary instead of the data buffer (IMPALA-12137)
const bool copy_buffer = value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
if (copy_buffer) {
// In this case returned batches will have pointers into the data page itself.
// We don't transfer disk I/O buffers out of the scanner so we need to copy
// the page data so that it can be attached to output batches.
uint8_t* buffer;
RETURN_IF_ERROR(AllocateUncompressedDataPage(
uncompressed_size, "uncompressed variable-length data", &buffer));
memcpy(buffer, compressed_data, uncompressed_size);
data = buffer;
} else {
data = compressed_data;
}
if (has_slot_desc) {
// Use original sizes (includes levels in v2) in the profile.
parent_->scan_node_->UpdateBytesRead(slot_id_, orig_uncompressed_size, 0);
parent_->UpdateUncompressedPageSizeCounter(orig_uncompressed_size);
}
}
// The buffers to return are ready at this point.
// If v1 data page, fill rep/def level info by parsing the beginning of the data. For
// v2 pages this was done before decompression.
if (!is_v2) {
RETURN_IF_ERROR(ProcessRepDefLevelsInDataPageV1(&header.data_page_header, page_info,
&data, &data_size));
}
page_info->data_encoding = is_v2 ? header.data_page_header_v2.encoding
: header.data_page_header.encoding;
page_info->data_ptr = data;
page_info->data_size = data_size;
page_info->is_valid = true;
return Status::OK();
}
Status ParquetColumnChunkReader::AllocateUncompressedDataPage(int64_t size,
const char* err_ctx, uint8_t** buffer) {
*buffer = data_page_pool_->TryAllocate(size);
if (*buffer == nullptr) {
string details =
Substitute(PARQUET_PAGE_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx);
return data_page_pool_->mem_tracker()->MemLimitExceeded(
parent_->state_, details, size);
}
return Status::OK();
}
}