blob: 6c9b2bfb6fd7f09f7b3cc5f0e8670b3690d9ac3a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "common/logging.h"
#include "exec/parquet/hdfs-parquet-scanner.h"
#include "exec/parquet/parquet-page-index.h"
#include "gutil/strings/substitute.h"
#include "rpc/thrift-util.h"
#include "runtime/io/request-context.h"
#include "runtime/io/request-ranges.h"
#include <memory>
#include "common/names.h"
using namespace parquet;
using namespace impala::io;
namespace impala {
ParquetPageIndex::ParquetPageIndex(HdfsParquetScanner* scanner) :
scanner_(scanner),
page_index_buffer_(scanner_->scan_node_->mem_tracker())
{
}
bool ParquetPageIndex::DeterminePageIndexRangesInRowGroup(
const parquet::RowGroup& row_group, int64_t* column_index_start,
int64_t* column_index_size, int64_t* offset_index_start, int64_t* offset_index_size) {
int64_t ci_start = numeric_limits<int64_t>::max();
int64_t oi_start = numeric_limits<int64_t>::max();
int64_t ci_end = -1;
int64_t oi_end = -1;
for (const ColumnChunk& col_chunk : row_group.columns) {
if (col_chunk.__isset.column_index_offset && col_chunk.__isset.column_index_length) {
ci_start = min(ci_start, col_chunk.column_index_offset);
ci_end = max(ci_end, col_chunk.column_index_offset + col_chunk.column_index_length);
}
if (col_chunk.__isset.offset_index_offset && col_chunk.__isset.offset_index_length) {
oi_start = min(oi_start, col_chunk.offset_index_offset);
oi_end = max(oi_end, col_chunk.offset_index_offset + col_chunk.offset_index_length);
}
}
bool has_page_index = oi_end != -1 && ci_end != -1;
if (has_page_index) {
*column_index_start = ci_start;
*column_index_size = ci_end - ci_start;
*offset_index_start = oi_start;
*offset_index_size = oi_end - oi_start;
}
return has_page_index;
}
Status ParquetPageIndex::ReadAll(int row_group_idx) {
DCHECK(page_index_buffer_.buffer() == nullptr);
bool has_page_index = DeterminePageIndexRangesInRowGroup(
scanner_->file_metadata_.row_groups[row_group_idx],
&column_index_base_offset_, &column_index_size_,
&offset_index_base_offset_, &offset_index_size_);
// It's not an error if there is no page index.
if (!has_page_index) return Status::OK();
int64_t scan_range_start = column_index_base_offset_;
int64_t scan_range_size =
offset_index_base_offset_ + offset_index_size_ - column_index_base_offset_;
vector<ScanRange::SubRange> sub_ranges;
if (column_index_base_offset_ + column_index_size_ <= offset_index_base_offset_) {
// The sub-ranges will be merged if they are contiguous.
sub_ranges.push_back({column_index_base_offset_, column_index_size_});
sub_ranges.push_back({offset_index_base_offset_, offset_index_size_});
} else {
return Status(Substitute("Found unsupported Parquet page index layout for file '$1'.",
scanner_->filename()));
}
int64_t buffer_size = column_index_size_ + offset_index_size_;
if (!page_index_buffer_.TryAllocate(buffer_size)) {
return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
"page index for file '$1'.", buffer_size, scanner_->filename()));
}
int64_t partition_id = scanner_->context_->partition_descriptor()->id();
int cache_options =
scanner_->metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* object_range = scanner_->scan_node_->AllocateScanRange(
scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size,
scan_range_start, move(sub_ranges), partition_id,
scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(),
scanner_->metadata_range_->is_erasure_coded(), scanner_->metadata_range_->mtime(),
BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(),
cache_options));
unique_ptr<BufferDescriptor> io_buffer;
bool needs_buffers;
RETURN_IF_ERROR(
scanner_->scan_node_->reader_context()->StartScanRange(object_range,
&needs_buffers));
DCHECK(!needs_buffers) << "Already provided a buffer";
RETURN_IF_ERROR(object_range->GetNext(&io_buffer));
DCHECK_EQ(io_buffer->buffer(), page_index_buffer_.buffer());
DCHECK_EQ(io_buffer->len(), page_index_buffer_.Size());
DCHECK(io_buffer->eosr());
object_range->ReturnBuffer(move(io_buffer));
return Status::OK();
}
Status ParquetPageIndex::DeserializeColumnIndex(const ColumnChunk& col_chunk,
ColumnIndex* column_index) {
if (page_index_buffer_.buffer() == nullptr) {
return Status(Substitute("No page index for file $0.", scanner_->filename()));
}
int64_t buffer_offset = col_chunk.column_index_offset - column_index_base_offset_;
uint32_t length = col_chunk.column_index_length;
DCHECK_GE(buffer_offset, 0);
DCHECK_LE(buffer_offset + length, column_index_size_);
return DeserializeThriftMsg(page_index_buffer_.buffer() + buffer_offset,
&length, true, column_index);
}
Status ParquetPageIndex::DeserializeOffsetIndex(const ColumnChunk& col_chunk,
OffsetIndex* offset_index) {
if (page_index_buffer_.buffer() == nullptr) {
return Status(Substitute("No page index for file $0.", scanner_->filename()));
}
int64_t buffer_offset = col_chunk.offset_index_offset - offset_index_base_offset_ +
column_index_size_;
uint32_t length = col_chunk.offset_index_length;
DCHECK_GE(buffer_offset, 0);
DCHECK_LE(buffer_offset + length, page_index_buffer_.Size());
return DeserializeThriftMsg(page_index_buffer_.buffer() + buffer_offset,
&length, true, offset_index);
}
}