blob: 0f9d23b644ec5b9087b48f510b6d6ecd71ee1726 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-orc-scanner.h"
#include <queue>
#include <set>
#include "exec/exec-node.inline.h"
#include "exec/orc-column-readers.h"
#include "exec/scanner-context.inline.h"
#include "exprs/expr.h"
#include "exprs/scalar-expr.h"
#include "runtime/collection-value-builder.h"
#include "runtime/exec-env.h"
#include "runtime/io/request-context.h"
#include "runtime/mem-tracker.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/timestamp-value.inline.h"
#include "runtime/tuple-row.h"
#include "util/decompress.h"
#include "common/names.h"
using namespace impala;
using namespace impala::io;
using namespace impala::io;
namespace impala {
/// Generic wrapper to catch exceptions thrown from the ORC lib.
/// ResourceError is thrown by the OrcMemPool of our orc-scanner.
/// Other exceptions, e.g. orc::ParseError, are thrown by the ORC lib.
#define RETURN_ON_ORC_EXCEPTION(msg_format) \
catch (ResourceError& e) { \
parse_status_ = e.GetStatus(); \
return parse_status_; \
} catch (std::exception& e) { \
string msg = Substitute(msg_format, filename(), e.what()); \
parse_status_ = Status(msg); \
VLOG_QUERY << parse_status_.msg().msg(); \
return parse_status_; \
}
Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
DCHECK(!files.empty());
for (HdfsFileDesc* file : files) {
// If the file size is less than 10 bytes, it is an invalid ORC file.
if (file->file_length < 10) {
return Status(Substitute("ORC file $0 has an invalid file length: $1",
file->filename, file->file_length));
}
}
return IssueFooterRanges(scan_node, THdfsFileFormat::ORC, files, ORC_FOOTER_SIZE);
}
HdfsOrcScanner::OrcMemPool::OrcMemPool(HdfsOrcScanner* scanner)
: scanner_(scanner), mem_tracker_(scanner_->scan_node_->mem_tracker()) {
}
HdfsOrcScanner::OrcMemPool::~OrcMemPool() {
FreeAll();
}
void HdfsOrcScanner::OrcMemPool::FreeAll() {
if (!chunk_sizes_.empty()) {
scanner_->state_->LogError(ErrorMsg(TErrorCode::INTERNAL_ERROR,
"Impala had to free memory leaked by ORC library."));
}
int64_t total_bytes_released = 0;
for (auto it = chunk_sizes_.begin(); it != chunk_sizes_.end(); ++it) {
std::free(it->first);
total_bytes_released += it->second;
}
mem_tracker_->Release(total_bytes_released);
chunk_sizes_.clear();
}
// orc-reader will not check the malloc result. We throw an exception if we can't
// malloc to stop the orc-reader.
char* HdfsOrcScanner::OrcMemPool::malloc(uint64_t size) {
if (!mem_tracker_->TryConsume(size)) {
throw ResourceError(mem_tracker_->MemLimitExceeded(
scanner_->state_, "Failed to allocate memory required by ORC library", size));
}
char* addr = static_cast<char*>(std::malloc(size));
if (addr == nullptr) {
mem_tracker_->Release(size);
throw ResourceError(Status(TErrorCode::MEM_ALLOC_FAILED, size));
}
chunk_sizes_[addr] = size;
return addr;
}
void HdfsOrcScanner::OrcMemPool::free(char* p) {
DCHECK(chunk_sizes_.find(p) != chunk_sizes_.end()) << "invalid free!" << endl
<< GetStackTrace();
std::free(p);
int64_t size = chunk_sizes_[p];
mem_tracker_->Release(size);
chunk_sizes_.erase(p);
}
void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
uint64_t offset) {
Status status;
if (scanner_->IsInFooterRange(offset, length)) {
status = scanner_->ReadFooterStream(buf, length, offset);
} else {
ColumnRange* columnRange = scanner_->FindColumnRange(length, offset);
if (columnRange == nullptr) {
status = readRandom(buf, length, offset);
} else if (offset < columnRange->current_position_) {
VLOG_QUERY << Substitute(
"ORC read request to already read range. Falling back to readRandom. "
"offset: $0 length: $1 $2",
offset, length, columnRange->debug());
status = readRandom(buf, length, offset);
} else {
status = columnRange->read(buf, length, offset);
}
}
if (!status.ok()) throw ResourceError(status);
}
Status HdfsOrcScanner::ScanRangeInputStream::readRandom(
void* buf, uint64_t length, uint64_t offset) {
if (offset + length > getLength()) {
string msg = Substitute("Invalid read offset/length on ORC file $0. offset: $1 "
"length: $2 file_length: $3.",
filename_, offset, length, getLength());
return Status(msg);
}
const ScanRange* metadata_range = scanner_->metadata_range_;
const ScanRange* split_range =
reinterpret_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
int64_t partition_id = scanner_->context_->partition_descriptor()->id();
bool expected_local = split_range->ExpectedLocalRead(offset, length);
int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* range = scanner_->scan_node_->AllocateScanRange(
metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
split_range->disk_id(), expected_local, split_range->mtime(),
BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
unique_ptr<BufferDescriptor> io_buffer;
Status status;
{
SCOPED_TIMER2(scanner_->state_->total_storage_wait_timer(),
scanner_->scan_node_->scanner_io_wait_time());
bool needs_buffers;
status =
scanner_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers);
DCHECK(!status.ok() || !needs_buffers) << "Already provided a buffer";
if (status.ok()) status = range->GetNext(&io_buffer);
}
if (io_buffer != nullptr) {
DCHECK_EQ(io_buffer->len(), length);
scanner_->AddSyncReadBytesCounter(length);
range->ReturnBuffer(move(io_buffer));
}
return status;
}
bool useAsyncIoForStream(orc::StreamKind kind) {
switch (kind) {
case orc::StreamKind_DATA:
case orc::StreamKind_LENGTH:
case orc::StreamKind_SECONDARY:
case orc::StreamKind_DICTIONARY_DATA:
case orc::StreamKind_DICTIONARY_COUNT:
case orc::StreamKind_PRESENT:
return true;
// We skip Async IO for the following stream kind. We expect that these streams will
// be read in one batch, so async reading does not help much.
// They also not too large.
case orc::StreamKind_ROW_INDEX:
case orc::StreamKind_BLOOM_FILTER:
case orc::StreamKind_BLOOM_FILTER_UTF8:
return false;
default:
// We might have bogus StreamKind if we are reading corrupted ORC file.
// Return false and let ORC lib deals with it.
return false;
}
}
Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe) {
columnRanges_.clear();
const std::list<uint64_t>& selected_type_ids = selected_type_ids_;
// Collect the stream belonging to selected columns.
set<uint64_t> column_id_set(selected_type_ids.begin(), selected_type_ids.end());
try {
uint64_t stream_count = stripe.getNumberOfStreams();
for (uint64_t stream_id = 0; stream_id < stream_count; stream_id++) {
unique_ptr<orc::StreamInformation> stream = stripe.getStreamInformation(stream_id);
if (column_id_set.find(stream->getColumnId()) == column_id_set.end()) continue;
if (!useAsyncIoForStream(stream->getKind())) continue;
if (stream->getLength() == 0) continue;
columnRanges_.emplace_back(stream->getLength(), stream->getOffset(),
stream->getKind(), stream->getColumnId(), this);
}
} RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
// Sort and check that there is no overlapping range in columnRanges_.
sort(columnRanges_.begin(), columnRanges_.end());
uint64_t last_end = 0;
for (const ColumnRange& range : columnRanges_) {
if (last_end > range.offset_) {
string msg =
Substitute("Overlapping ORC column ranges. Last end: $0 Current offset: $1",
last_end, range.offset_);
return Status(msg);
}
last_end = range.offset_ + range.length_;
}
// Divide reservation between columns.
ColumnRangeLengths col_range_lengths(columnRanges_.size());
for (int i = 0; i < columnRanges_.size(); ++i) {
col_range_lengths[i] = columnRanges_[i].length_;
}
ColumnReservations tmp_reservations;
RETURN_IF_ERROR(DivideReservationBetweenColumns(col_range_lengths, tmp_reservations));
for (auto& tmp_reservation : tmp_reservations) {
columnRanges_[tmp_reservation.first].io_reservation = tmp_reservation.second;
}
int64_t partition_id = context_->partition_descriptor()->id();
const ScanRange* split_range =
static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
for (ColumnRange& range : columnRanges_) {
// Determine if the column is completely contained within a local split.
bool col_range_local = split_range->ExpectedLocalRead(range.offset_, range.length_);
int64_t file_length = scan_node_->GetFileDesc(partition_id, filename())->file_length;
if (range.offset_ + range.length_ > file_length) {
string msg = Substitute("Invalid read len.");
return Status(msg);
}
ScanRange* scan_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
filename(), range.length_, range.offset_, partition_id, split_range->disk_id(),
col_range_local, split_range->mtime(), BufferOpts(split_range->cache_options()));
RETURN_IF_ERROR(
context_->AddAndStartStream(scan_range, range.io_reservation, &range.stream_));
}
return Status::OK();
}
Status HdfsOrcScanner::ColumnRange::read(void* buf, uint64_t length, uint64_t offset) {
if (offset + length > offset_ + length_) {
string msg = Substitute("ORC read request out of range. offset: $0 length: $1 $2",
offset, length, debug());
return Status(msg);
}
DCHECK(offset >= current_position_);
Status status;
if (offset > current_position_) {
// skip the non-requested range
uint64_t bytes_to_skip = offset - current_position_;
if (!stream_->SkipBytes(bytes_to_skip, &status)) {
LOG(ERROR) << Substitute(
"HdfsOrcScanner::ColumnRange::read skipping failed. offset: $0 length: $1 $2",
offset, length, debug());
return status;
}
scanner_->AddSkippedReadBytesCounter(bytes_to_skip);
current_position_ = offset;
}
uint8_t* stream_buf = nullptr;
{
SCOPED_TIMER2(scanner_->state_->total_storage_wait_timer(),
scanner_->scan_node_->scanner_io_wait_time());
if (!stream_->ReadBytes(length, &stream_buf, &status, false)) return status;
scanner_->AddAsyncReadBytesCounter(length);
}
CHECK_NOTNULL(stream_buf);
memcpy(buf, stream_buf, length); // TODO: ORC-262: extend ORC interface to avoid copy.
current_position_ += length;
bool done = current_position_ == offset_ + length_;
DCHECK_EQ(current_position_, stream_->file_offset());
DCHECK_EQ(done, stream_->bytes_left() == 0);
stream_->ReleaseCompletedResources(done);
return Status::OK();
}
HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: HdfsColumnarScanner(scan_node, state),
dictionary_pool_(new MemPool(scan_node->mem_tracker())),
data_batch_pool_(new MemPool(scan_node->mem_tracker())),
search_args_pool_(new MemPool(scan_node->mem_tracker())) {
assemble_rows_timer_.Stop();
}
HdfsOrcScanner::~HdfsOrcScanner() {
}
Status HdfsOrcScanner::Open(ScannerContext* context) {
RETURN_IF_ERROR(HdfsColumnarScanner::Open(context));
metadata_range_ = stream_->scan_range();
num_stripes_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcStripes", TUnit::UNIT);
num_pushed_down_predicates_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumPushedDownPredicates", TUnit::UNIT);
num_pushed_down_runtime_filters_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumPushedDownRuntimeFilters",
TUnit::UNIT);
codegend_process_scratch_batch_fn_ = scan_node_->GetCodegenFn(THdfsFileFormat::ORC);
if (codegend_process_scratch_batch_fn_ == nullptr) {
scan_node_->IncNumScannersCodegenDisabled();
} else {
scan_node_->IncNumScannersCodegenEnabled();
}
DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
for (const FilterContext& ctx : context->filter_ctxs()) {
DCHECK(ctx.filter != nullptr);
filter_ctxs_.push_back(&ctx);
}
filter_stats_.resize(filter_ctxs_.size());
reader_mem_pool_.reset(new OrcMemPool(this));
reader_options_.setMemoryPool(*reader_mem_pool_);
// Each scan node can process multiple splits. Each split processes the footer once.
// We use a timer to measure the time taken to ProcessFileTail() per split and add
// this time to the averaged timer.
MonotonicStopWatch single_footer_process_timer;
single_footer_process_timer.Start();
// First process the file metadata in the footer.
Status footer_status = ProcessFileTail();
single_footer_process_timer.Stop();
process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime());
// Release I/O buffers immediately to make sure they are cleaned up
// in case we return a non-OK status anywhere below.
context_->ReleaseCompletedResources(true);
RETURN_IF_ERROR(footer_status);
bool is_table_full_acid = scan_node_->hdfs_table()->IsTableFullAcid();
schema_resolver_.reset(
new OrcSchemaResolver(*scan_node_->hdfs_table(), &reader_->getType(), filename(),
is_table_full_acid, state_->query_options().orc_schema_resolution));
bool is_file_full_acid = schema_resolver_->HasFullAcidV2Schema();
acid_original_file_ = is_table_full_acid && !is_file_full_acid;
if (is_table_full_acid) {
acid_write_id_range_ = valid_write_ids_.GetWriteIdRange(filename());
if (acid_original_file_ &&
acid_write_id_range_.first != acid_write_id_range_.second) {
return Status(Substitute("Found non-ACID file in directory that can only contain "
"files with full ACID schema: $0", filename()));
}
}
if (acid_original_file_) {
int32_t filename_len = strlen(filename());
if (filename_len >= 2 && strcmp(filename() + filename_len - 2, "_0") != 0) {
// It's an original file that should be included in the result.
// If it doesn't end with "_0" it means that it belongs to a bucket with other
// files. Impala rejects such files and tables.
// These files should only exist at table/partition root directory level.
// Original files in delta directories are created via the LOAD DATA
// statement. LOAD DATA assigns virtual bucket ids to files in non-bucketed
// tables, so we will have one file per (virtual) bucket (all of them having "_0"
// ending). For bucketed tables LOAD DATA will write ACID files. So after the first
// major compaction the table should never get into this state ever again.
return Status(Substitute("Found original file with unexpected name: $0 "
"Please run a major compaction on the partition/table to overcome this.",
filename()));
}
}
// Hive Streaming Ingestion allocates multiple write ids, hence create delta directories
// like delta_5_10. Then it continuously appends new stripes (and footers) to the
// ORC files of the delte dir. So it's possible that the file has rows that Impala is
// not allowed to see based on its valid write id list. In such cases we need to
// validate the write ids of the row batches.
if (is_table_full_acid && !ValidWriteIdList::IsCompacted(filename())) {
valid_write_ids_.InitFrom(scan_node_->hdfs_table()->ValidWriteIdList());
ValidWriteIdList::RangeResponse rows_valid = valid_write_ids_.IsWriteIdRangeValid(
acid_write_id_range_.first, acid_write_id_range_.second);
DCHECK_NE(rows_valid, ValidWriteIdList::NONE);
row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME;
}
if (scan_node_->optimize_count_star() && !row_batches_need_validation_) {
template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
return Status::OK();
}
// Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
// columns we don't need.
RETURN_IF_ERROR(SelectColumns(*scan_node_->tuple_desc()));
// By enabling lazy decoding, String stripes with DICTIONARY_ENCODING[_V2] can be
// stored in an EncodedStringVectorBatch, where the data is stored in a dictionary
// blob more efficiently.
row_reader_options_.setEnableLazyDecoding(true);
// Clone the statistics conjuncts.
RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_perm_pool_.get(),
context_->expr_results_pool(), scan_node_->stats_conjunct_evals(),
&stats_conjunct_evals_));
// To create OrcColumnReaders, we need the selected orc schema. It's a subset of the
// file schema: a tree of selected orc types and can only be got from an orc::RowReader
// (by orc::RowReader::getSelectedType).
// Selected nodes are still connected as a tree since if a node is selected, all its
// ancestors and children will be selected too.
// Here we haven't read stripe data yet so no orc::RowReaders are created. To get the
// selected types we create a temp orc::RowReader (but won't read rows from it).
try {
unique_ptr<orc::RowReader> tmp_row_reader =
reader_->createRowReader(row_reader_options_);
const orc::Type* root_type = &tmp_row_reader->getSelectedType();
if (root_type->getKind() != orc::TypeKind::STRUCT) {
parse_status_ = Status(TErrorCode::ORC_TYPE_NOT_ROOT_AT_STRUCT, "selected",
root_type->toString(), filename());
return parse_status_;
}
orc_root_reader_ = this->obj_pool_.Add(
new OrcStructReader(root_type, scan_node_->tuple_desc(), this));
orc_root_batch_ = tmp_row_reader->createRowBatch(state_->batch_size());
DCHECK_EQ(orc_root_batch_->numElements, 0);
} RETURN_ON_ORC_EXCEPTION(
"Encountered parse error during schema selection in ORC file $0: $1");
// Set top-level template tuple.
template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
return Status::OK();
}
void HdfsOrcScanner::Close(RowBatch* row_batch) {
DCHECK(!is_closed_);
if (row_batch != nullptr) {
context_->ReleaseCompletedResources(true);
row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
row_batch->tuple_data_pool()->AcquireData(data_batch_pool_.get(), false);
scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
if (scan_node_->HasRowBatchQueue()) {
static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
unique_ptr<RowBatch>(row_batch));
}
} else {
template_tuple_pool_->FreeAll();
dictionary_pool_->FreeAll();
data_batch_pool_->FreeAll();
context_->ReleaseCompletedResources(true);
scratch_batch_->ReleaseResources(nullptr);
}
orc_root_batch_.reset(nullptr);
search_args_pool_->FreeAll();
ScalarExprEvaluator::Close(stats_conjunct_evals_, state_);
// Verify all resources (if any) have been transferred.
DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
DCHECK_EQ(data_batch_pool_->total_allocated_bytes(), 0);
DCHECK_EQ(scratch_batch_->total_allocated_bytes(), 0);
assemble_rows_timer_.Stop();
assemble_rows_timer_.ReleaseCounter();
THdfsCompression::type compression_type = THdfsCompression::NONE;
if (reader_ != nullptr) {
compression_type = TranslateCompressionKind(reader_->getCompression());
}
scan_node_->RangeComplete(THdfsFileFormat::ORC, compression_type);
for (int i = 0; i < filter_ctxs_.size(); ++i) {
const FilterStats* stats = filter_ctxs_[i]->stats;
const LocalFilterStats& local = filter_stats_[i];
stats->IncrCounters(FilterStats::ROWS_KEY, local.total_possible,
local.considered, local.rejected);
}
CloseInternal();
}
Status HdfsOrcScanner::ProcessFileTail() {
try {
// ScanRangeInputStream keeps a pointer to this HdfsOrcScanner so we can hack
// async IO behind the orc::InputStream interface. The ranges of the
// selected columns will be updated when starting new stripes.
unique_ptr<orc::InputStream> input_stream(new ScanRangeInputStream(this));
VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
<< ", file_length: " << input_stream->getLength();
reader_ = orc::createReader(move(input_stream), reader_options_);
} RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
if (reader_->getNumberOfRows() == 0) return Status::OK();
if (reader_->getNumberOfStripes() == 0) {
return Status(Substitute("Invalid ORC file: $0. No stripes in this file but"
" numberOfRows in footer is $1", filename(), reader_->getNumberOfRows()));
}
return Status::OK();
}
inline THdfsCompression::type HdfsOrcScanner::TranslateCompressionKind(
orc::CompressionKind kind) {
switch (kind) {
case orc::CompressionKind::CompressionKind_NONE: return THdfsCompression::NONE;
// zlib used in ORC is corresponding to Deflate in Impala
case orc::CompressionKind::CompressionKind_ZLIB: return THdfsCompression::DEFLATE;
case orc::CompressionKind::CompressionKind_SNAPPY: return THdfsCompression::SNAPPY;
case orc::CompressionKind::CompressionKind_LZO: return THdfsCompression::LZO;
case orc::CompressionKind::CompressionKind_LZ4: return THdfsCompression::LZ4;
case orc::CompressionKind::CompressionKind_ZSTD: return THdfsCompression::ZSTD;
default:
VLOG_QUERY << "Unknown compression kind of orc::CompressionKind: " << kind;
}
return THdfsCompression::DEFAULT;
}
bool HdfsOrcScanner::IsPartitionKeySlot(const SlotDescriptor* slot) {
return file_metadata_utils_.IsValuePartitionCol(slot);
}
bool HdfsOrcScanner::IsMissingField(const SlotDescriptor* slot) {
return missing_field_slots_.find(slot) != missing_field_slots_.end();
}
// Fetch fully qualified name for 'col_path' by converting it into non-canonical
// table path.
string PrintColPath(const HdfsTableDescriptor& hdfs_table, const SchemaPath& col_path,
const unique_ptr<OrcSchemaResolver>& schema_resolver) {
SchemaPath table_col_path, file_col_path;
if (col_path.size() > 0) {
DCHECK(schema_resolver != nullptr);
// Convert 'col_path' to non-canonical table path 'table_col_path'.
schema_resolver->TranslateColPaths(col_path, &table_col_path, &file_col_path);
auto it = table_col_path.begin();
// remove initial -1s from the table_col_path
// -1 is present to represent some of the constructs in ACID table which are not
// present in table schema
while (it != table_col_path.end()) {
if (*it == -1) {
it = table_col_path.erase(it);
} else {
break;
}
}
}
return PrintPath(hdfs_table, table_col_path);
}
Status HdfsOrcScanner::ResolveColumns(const TupleDescriptor& tuple_desc,
list<const orc::Type*>* selected_nodes, stack<const SlotDescriptor*>* pos_slots) {
const orc::Type* node = nullptr;
bool pos_field = false;
bool missing_field = false;
// 1. Deal with the tuple descriptor. It should map to an ORC type.
RETURN_IF_ERROR(schema_resolver_->ResolveColumn(tuple_desc.tuple_path(), &node,
&pos_field, &missing_field));
if (missing_field) {
return Status(Substitute("Could not find nested column '$0' in file '$1'.",
PrintColPath(*scan_node_->hdfs_table(), tuple_desc.tuple_path(),
schema_resolver_), filename()));
}
tuple_to_col_id_.insert({&tuple_desc, node->getColumnId()});
if (tuple_desc.byte_size() == 0) {
// Don't need to materialize any slots but just generate an empty tuple for each
// (collection) row. (E.g. count(*) or 'exists' on results of subquery).
// Due to ORC-450 we can't get the number of tuples inside a collection without
// reading its items (or subcolumn of its items). So we select the most inner
// subcolumn of the collection (get by orc::Type::getMaximumColumnId()). E.g.
// if 'node' is array<struct<c1:int,c2:int,c3:int>> and we just need the array
// lengths, we still need to read at least one subcolumn otherwise the ORC lib
// will skip the whole array column. So we select 'c3' for this case.
selected_type_ids_.push_back(node->getMaximumColumnId());
VLOG(3) << "Add ORC column " << node->getMaximumColumnId() << " for empty tuple "
<< PrintColPath(*scan_node_->hdfs_table(), tuple_desc.tuple_path(),
schema_resolver_);
return Status::OK();
}
// 2. Deal with slots of the tuple descriptor. Each slot should map to an ORC type,
// otherwise it should be the position slot. Each tuple can have at most one position
// slot.
SlotDescriptor* pos_slot_desc = nullptr;
for (SlotDescriptor* slot_desc : tuple_desc.slots()) {
// Skip columns not (necessarily) stored in the data files.
if (!file_metadata_utils_.NeedDataInFile(slot_desc)) continue;
node = nullptr;
pos_field = false;
missing_field = false;
// Reminder: slot_desc->col_path() can be much deeper than tuple_desc.tuple_path()
// to reference to a deep subcolumn.
RETURN_IF_ERROR(schema_resolver_->ResolveColumn(
slot_desc->col_path(), &node, &pos_field, &missing_field));
if (missing_field) {
if (slot_desc->type().IsCollectionType()) {
// If the collection column is missing, the whole scan range should return 0 rows
// since we're selecting children column(s) of the collection.
return Status(Substitute("Could not find nested column '$0' in file '$1'.",
PrintColPath(*scan_node_->hdfs_table(), slot_desc->col_path(),
schema_resolver_), filename()));
}
// In this case, we are selecting a column/subcolumn that is not in the file.
// Update the template tuple to put a NULL in this slot.
Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
if (*template_tuple == nullptr) {
*template_tuple =
Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
}
if (acid_original_file_ && schema_resolver_->IsAcidColumn(slot_desc->col_path())) {
SetSyntheticAcidFieldForOriginalFile(slot_desc, *template_tuple);
} else {
(*template_tuple)->SetNull(slot_desc->null_indicator_offset());
}
missing_field_slots_.insert(slot_desc);
continue;
}
slot_to_col_id_.insert({slot_desc, node->getColumnId()});
if (pos_field) {
DCHECK(pos_slot_desc == nullptr)
<< "There should only be one position slot per tuple";
pos_slot_desc = slot_desc;
pos_slots->push(pos_slot_desc);
DCHECK_EQ(node->getKind(), orc::TypeKind::LIST);
continue;
}
if (slot_desc->type().IsComplexType()) {
// Recursively resolve nested columns
DCHECK(slot_desc->children_tuple_descriptor() != nullptr);
const TupleDescriptor* item_tuple_desc = slot_desc->children_tuple_descriptor();
RETURN_IF_ERROR(ResolveColumns(*item_tuple_desc, selected_nodes, pos_slots));
} else {
VLOG(3) << "Add ORC column " << node->getColumnId() << " for "
<< PrintColPath(*scan_node_->hdfs_table(), slot_desc->col_path(),
schema_resolver_);
selected_nodes->push_back(node);
}
}
return Status::OK();
}
void HdfsOrcScanner::SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor* slot_desc,
Tuple* template_tuple) {
DCHECK_EQ(1, slot_desc->col_path().size());
int field_idx = slot_desc->col_path().front() - scan_node_->num_partition_keys();
switch(field_idx) {
case ACID_FIELD_OPERATION_INDEX:
*template_tuple->GetIntSlot(slot_desc->tuple_offset()) = 0;
break;
case ACID_FIELD_ORIGINAL_TRANSACTION_INDEX:
case ACID_FIELD_CURRENT_TRANSACTION_INDEX:
DCHECK_EQ(acid_write_id_range_.first, acid_write_id_range_.second);
*template_tuple->GetBigIntSlot(slot_desc->tuple_offset()) =
acid_write_id_range_.first;
break;
case ACID_FIELD_BUCKET_INDEX:
*template_tuple->GetBigIntSlot(slot_desc->tuple_offset()) =
ValidWriteIdList::GetBucketProperty(filename());
break;
case ACID_FIELD_ROWID_INDEX:
acid_synthetic_rowid_ = slot_desc;
default:
break;
}
}
/// Whether 'selected_type_ids' contains the id of any children of 'node'
bool HasChildrenSelected(const orc::Type& node,
const list<uint64_t>& selected_type_ids) {
for (uint64_t id : selected_type_ids) {
if (id >= node.getColumnId() && id <= node.getMaximumColumnId()) return true;
}
return false;
}
Status HdfsOrcScanner::SelectColumns(const TupleDescriptor& tuple_desc) {
list<const orc::Type*> selected_nodes;
stack<const SlotDescriptor*> pos_slots;
// Select columns for all non-position slots.
RETURN_IF_ERROR(ResolveColumns(tuple_desc, &selected_nodes, &pos_slots));
for (auto t : selected_nodes) selected_type_ids_.push_back(t->getColumnId());
// Select columns for array positions. Due to ORC-450 we can't materialize array
// offsets without materializing its items, so we should still select the item or any
// sub column of the item. To be simple, we choose the max column id in the subtree
// of the ARRAY node.
// We process the deeper position slots first since it may introduce an item column
// that can also serve the position slot of upper arrays. E.g. for 'array_col' as
// array<struct<c1:int,c2:int,c3:array<int>>>, if both 'array_col.pos' and
// 'array_col.item.c3.pos' are needed, we just need to select 'array_col.item.c3.item'
// in the ORC lib, then we get offsets(indices) of both the inner and outer arrays.
while (!pos_slots.empty()) {
const SlotDescriptor* pos_slot_desc = pos_slots.top();
pos_slots.pop();
const orc::Type* array_node = nullptr;
bool pos_field = false;
bool missing_field = false;
RETURN_IF_ERROR(schema_resolver_->ResolveColumn(pos_slot_desc->col_path(),
&array_node, &pos_field, &missing_field));
if (HasChildrenSelected(*array_node, selected_type_ids_)) continue;
selected_type_ids_.push_back(array_node->getMaximumColumnId());
VLOG(3) << "Add ORC column " << array_node->getMaximumColumnId() << " for "
<< PrintColPath(*scan_node_->hdfs_table(), pos_slot_desc->col_path(),
schema_resolver_);
selected_nodes.push_back(array_node);
}
// Select "CurrentTransaction" when we need to validate rows.
if (row_batches_need_validation_) {
// In case of zero-slot scans (e.g. count(*) over the table) we only select the
// 'currentTransaction' column.
if (scan_node_->IsZeroSlotTableScan()) selected_type_ids_.clear();
if (std::find(selected_type_ids_.begin(), selected_type_ids_.end(),
CURRENT_TRANSCACTION_TYPE_ID) == selected_type_ids_.end()) {
selected_type_ids_.push_back(CURRENT_TRANSCACTION_TYPE_ID);
}
}
COUNTER_SET(num_cols_counter_, static_cast<int64_t>(selected_type_ids_.size()));
row_reader_options_.includeTypes(selected_type_ids_);
return Status::OK();
}
Status HdfsOrcScanner::ProcessSplit() {
DCHECK(scan_node_->HasRowBatchQueue());
HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
do {
unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
state_->batch_size(), scan_node_->mem_tracker());
if (scan_node_->is_partition_key_scan()) batch->limit_capacity(1);
Status status = GetNextInternal(batch.get());
// If we are doing a partition key scan, we are done scanning the file after
// returning at least one row.
if (scan_node_->is_partition_key_scan() && batch->num_rows() > 0) eos_ = true;
// Always add batch to the queue because it may contain data referenced by previously
// appended batches.
scan_node->AddMaterializedRowBatch(move(batch));
RETURN_IF_ERROR(status);
++row_batches_produced_;
if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
CheckFiltersEffectiveness();
}
} while (!eos_ && !scan_node_->ReachedLimitShared());
return Status::OK();
}
Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
if (row_batches_need_validation_) {
// In case 'row_batches_need_validation_' is true, we need to look at the row
// batches and check their validity. This might be a zero slot scan, which
// 'currentTransaction' is the only selected field from the file. And this should
// not be an optimized count(*) because it is disabled for full acid table.
DCHECK(!scan_node_->optimize_count_star());
} else if (scan_node_->optimize_count_star()) {
// This is an optimized count(*) case.
// For each file, populate one slot with the footer's numberOfRows statistic.
return GetNextWithCountStarOptimization(row_batch);
} else if (scan_node_->IsZeroSlotTableScan()) {
// There are no materialized slots, e.g. "select 1" over the table. We can serve
// this query from just the file metadata. We don't need to read the column data.
return GetNextWithTemplateTuple(row_batch);
}
if (!scratch_batch_->AtEnd()) {
assemble_rows_timer_.Start();
int num_row_to_commit = TransferScratchTuples(row_batch);
assemble_rows_timer_.Stop();
RETURN_IF_ERROR(CommitRows(num_row_to_commit, row_batch));
if (row_batch->AtCapacity()) return Status::OK();
}
// reset tuple memory. We'll allocate it the first time we use it.
tuple_mem_ = nullptr;
tuple_ = nullptr;
// Transfer remaining values in current orc batch. They are left in the previous call
// of 'TransferTuples' inside 'AssembleRows'. Since the orc batch has the same capacity
// as RowBatch's, the remaining values should be drained by one more round of calling
// 'TransferTuples' here.
if (!orc_root_reader_->EndOfBatch()) {
assemble_rows_timer_.Start();
RETURN_IF_ERROR(TransferTuples(row_batch));
assemble_rows_timer_.Stop();
if (row_batch->AtCapacity()) return Status::OK();
DCHECK(orc_root_reader_->EndOfBatch());
}
// Process next stripe if current stripe is drained. Each stripe will generate several
// orc batches. We only advance the stripe after processing the last batch.
// 'advance_stripe_' is updated in 'NextStripe', meaning the current stripe we advance
// to can be skip. 'end_of_stripe_' marks whether current stripe is drained. It's only
// set to true in 'AssembleRows'.
while (advance_group_ || end_of_stripe_) {
// The next stripe will use a new dictionary blob so transfer the memory to row_batch.
row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
context_->ReleaseCompletedResources(/* done */ true);
// Commit the rows to flush the row batch from the previous stripe.
RETURN_IF_ERROR(CommitRows(0, row_batch));
RETURN_IF_ERROR(NextStripe());
DCHECK_LE(group_idx_, reader_->getNumberOfStripes());
if (group_idx_ == reader_->getNumberOfStripes()) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
}
}
// Apply any runtime filters to static tuples containing the partition keys for this
// partition. If any filter fails, we return immediately and stop processing this
// scan range.
if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
}
assemble_rows_timer_.Start();
Status status = AssembleRows(row_batch);
assemble_rows_timer_.Stop();
RETURN_IF_ERROR(status);
if (!parse_status_.ok()) {
RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
parse_status_ = Status::OK();
}
return Status::OK();
}
inline static bool CheckStripeOverlapsSplit(int64_t stripe_start, int64_t stripe_end,
int64_t split_start, int64_t split_end) {
return (split_start >= stripe_start && split_start < stripe_end) ||
(split_end > stripe_start && split_end <= stripe_end) ||
(split_start <= stripe_start && split_end >= stripe_end);
}
Status HdfsOrcScanner::NextStripe() {
const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
metadata_range_->meta_data())->original_split;
int64_t split_offset = split_range->offset();
int64_t split_length = split_range->len();
bool start_with_first_stripe = group_idx_ == -1;
bool misaligned_stripe_skipped = false;
advance_group_ = false;
rows_read_in_group_ = 0;
// Loop until we have found a non-empty stripe.
while (true) {
// Reset the parse status for the next stripe.
parse_status_ = Status::OK();
++group_idx_;
if (group_idx_ >= reader_->getNumberOfStripes()) {
if (start_with_first_stripe && misaligned_stripe_skipped) {
// We started with the first stripe and skipped all the stripes because they were
// misaligned. The execution flow won't reach this point if there is at least one
// non-empty stripe which this scanner can process.
COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
}
break;
}
unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(group_idx_);
// Also check 'footer_.numberOfRows' to make sure 'select count(*)' and 'select *'
// behave consistently for corrupt files that have 'footer_.numberOfRows == 0'
// but some data in stripe.
if (stripe->getNumberOfRows() == 0 || reader_->getNumberOfRows() == 0) continue;
uint64_t stripe_offset = stripe->getOffset();
uint64_t stripe_len = stripe->getIndexLength() + stripe->getDataLength() +
stripe->getFooterLength();
int64_t stripe_mid_pos = stripe_offset + stripe_len / 2;
if (!(stripe_mid_pos >= split_offset &&
stripe_mid_pos < split_offset + split_length)) {
// Middle pos not in split, this stripe will be handled by a different scanner.
// Mark if the stripe overlaps with the split.
misaligned_stripe_skipped |= CheckStripeOverlapsSplit(stripe_offset,
stripe_offset + stripe_len, split_offset, split_offset + split_length);
continue;
}
COUNTER_ADD(num_stripes_counter_, 1);
if (state_->query_options().orc_async_read) {
RETURN_IF_ERROR(StartColumnReading(*stripe.get()));
}
row_reader_options_.range(stripe->getOffset(), stripe_len);
// Update SearchArguments in case any new runtime filters arrive.
RETURN_IF_ERROR(PrepareSearchArguments());
try {
row_reader_ = reader_->createRowReader(row_reader_options_);
} RETURN_ON_ORC_EXCEPTION("Error in creating column readers for ORC file $0: $1.");
end_of_stripe_ = false;
VLOG_ROW << Substitute("Created RowReader for stripe(offset=$0, len=$1) in file $2",
stripe->getOffset(), stripe_len, filename());
break;
}
DCHECK(parse_status_.ok());
return Status::OK();
}
Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
bool continue_execution = !scan_node_->ReachedLimitShared() && !context_->cancelled();
if (!continue_execution) return Status::CancelledInternal("ORC scanner");
// We're going to free the previous batch. Clear the reference first.
RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(nullptr));
int64_t num_rows_read = 0;
while (continue_execution) { // one ORC batch (ColumnVectorBatch) in a round
if (orc_root_reader_->EndOfBatch()) {
row_batch->tuple_data_pool()->AcquireData(data_batch_pool_.get(), false);
try {
end_of_stripe_ |= !row_reader_->next(*orc_root_batch_);
RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(orc_root_batch_.get()));
if (acid_synthetic_rowid_ != nullptr) {
// Set the first row index of the batch. The ORC reader guarantees that rows
// are consecutive in the returned batch.
orc_root_reader_->SetFileRowIndex(row_reader_->getRowNumber());
}
if (end_of_stripe_) break; // no more data to process
} RETURN_ON_ORC_EXCEPTION("Encounter parse error in ORC file $0: $1.");
if (orc_root_batch_->numElements == 0) {
RETURN_IF_ERROR(CommitRows(0, row_batch));
end_of_stripe_ = true;
return Status::OK();
}
num_rows_read += orc_root_batch_->numElements;
}
RETURN_IF_ERROR(TransferTuples(row_batch));
if (row_batch->AtCapacity()) break;
continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled();
}
rows_read_in_group_ += num_rows_read;
COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
// Merge Scanner-local counter into HdfsScanNode counter and reset.
COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
coll_items_read_counter_ = 0;
return Status::OK();
}
Status HdfsOrcScanner::TransferTuples(RowBatch* dst_batch) {
DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
if (tuple_ == nullptr) RETURN_IF_ERROR(AllocateTupleMem(dst_batch));
int row_id = dst_batch->num_rows();
int capacity = dst_batch->capacity();
while (row_id < capacity && !orc_root_reader_->EndOfBatch()) {
DCHECK(scratch_batch_ != nullptr);
DCHECK(scratch_batch_->AtEnd());
RETURN_IF_ERROR(scratch_batch_->Reset(state_));
InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity);
RETURN_IF_ERROR(orc_root_reader_->TopLevelReadValueBatch(scratch_batch_.get(),
&scratch_batch_->aux_mem_pool));
int num_tuples_transferred = TransferScratchTuples(dst_batch);
row_id += num_tuples_transferred;
VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
num_tuples_transferred, dst_batch->num_rows());
RETURN_IF_ERROR(CommitRows(num_tuples_transferred, dst_batch));
}
return Status::OK();
}
Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
int64_t tuple_buffer_size;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
tuple_mem_end_ = tuple_mem_ + tuple_buffer_size;
tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
DCHECK_GT(row_batch->capacity(), 0);
return Status::OK();
}
Status HdfsOrcScanner::AssembleCollection(
const OrcComplexColumnReader& complex_col_reader, int row_idx,
CollectionValueBuilder* coll_value_builder) {
int total_tuples = complex_col_reader.GetNumChildValues(row_idx);
if (!complex_col_reader.MaterializeTuple()) {
// 'complex_col_reader' maps to a STRUCT or collection column of STRUCTs/collections
// and there're no need to materialize current level tuples. Delegate the
// materialization to the unique child reader.
DCHECK_EQ(complex_col_reader.children().size(), 1);
DCHECK(complex_col_reader.children()[0]->IsComplexColumnReader());
auto child_reader = reinterpret_cast<OrcComplexColumnReader*>(
complex_col_reader.children()[0]);
// We should give the child reader the boundary (offset and total tuples) of current
// collection
int child_batch_offset = complex_col_reader.GetChildBatchOffset(row_idx);
for (int i = 0; i < total_tuples; ++i) {
RETURN_IF_ERROR(AssembleCollection(*child_reader, child_batch_offset + i,
coll_value_builder));
}
return Status::OK();
}
DCHECK(complex_col_reader.IsCollectionReader());
auto coll_reader = reinterpret_cast<const OrcCollectionReader*>(&complex_col_reader);
const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
Tuple* template_tuple = template_tuple_map_[tuple_desc];
const vector<ScalarExprEvaluator*>& evals =
conjunct_evals_map_[tuple_desc->id()];
int tuple_idx = 0;
while (!scan_node_->ReachedLimitShared() && !context_->cancelled()
&& tuple_idx < total_tuples) {
MemPool* pool;
Tuple* tuple;
TupleRow* row = nullptr;
int64_t num_rows;
// We're assembling item tuples into an CollectionValue
RETURN_IF_ERROR(
GetCollectionMemory(coll_value_builder, &pool, &tuple, &row, &num_rows));
// 'num_rows' can be very high if we're writing to a large CollectionValue. Limit
// the number of rows we read at one time so we don't spend too long in the
// 'num_rows' loop below before checking for cancellation or limit reached.
num_rows = min(
num_rows, static_cast<int64_t>(scan_node_->runtime_state()->batch_size()));
int num_to_commit = 0;
while (num_to_commit < num_rows && tuple_idx < total_tuples) {
InitTuple(tuple_desc, template_tuple, tuple);
RETURN_IF_ERROR(coll_reader->ReadChildrenValue(row_idx, tuple_idx++, tuple, pool));
if (ExecNode::EvalConjuncts(evals.data(), evals.size(), row)) {
tuple = next_tuple(tuple_desc->byte_size(), tuple);
++num_to_commit;
}
}
coll_value_builder->CommitTuples(num_to_commit);
}
coll_items_read_counter_ += tuple_idx;
return Status::OK();
}
/// T is the intended ORC primitive type and U is Impala internal primitive type.
template<typename T, typename U>
orc::Literal HdfsOrcScanner::GetOrcPrimitiveLiteral(
orc::PredicateDataType predicate_type, void* val) {
if (UNLIKELY(!val)) return orc::Literal(predicate_type);
T* val_dst = reinterpret_cast<T*>(val);
return orc::Literal(static_cast<U>(*val_dst));
}
orc::PredicateDataType HdfsOrcScanner::GetOrcPredicateDataType(const ColumnType& type) {
switch (type.type) {
case TYPE_BOOLEAN:
return orc::PredicateDataType::BOOLEAN;
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT:
case TYPE_BIGINT:
return orc::PredicateDataType::LONG;
case TYPE_FLOAT:
case TYPE_DOUBLE:
return orc::PredicateDataType::FLOAT;
case TYPE_TIMESTAMP:
return orc::PredicateDataType::TIMESTAMP;
case TYPE_DATE:
return orc::PredicateDataType::DATE;
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_FIXED_UDA_INTERMEDIATE:
return orc::PredicateDataType::STRING;
case TYPE_DECIMAL:
return orc::PredicateDataType::DECIMAL;
default:
DCHECK(false) << "Unsupported type: " << type.DebugString();
return orc::PredicateDataType::LONG;
}
}
orc::Literal HdfsOrcScanner::GetSearchArgumentLiteral(ScalarExprEvaluator* eval,
int child_idx, const ColumnType& dst_type, orc::PredicateDataType* predicate_type) {
DCHECK_GE(child_idx, 1);
DCHECK_LT(child_idx, eval->root().GetNumChildren());
ScalarExpr* literal_expr = eval->root().GetChild(child_idx);
const ColumnType& type = literal_expr->type();
DCHECK(literal_expr->IsLiteral());
*predicate_type = GetOrcPredicateDataType(type);
// Since we want to get a literal value, the second parameter below is not used.
void* val = eval->GetValue(*literal_expr, nullptr);
switch (type.type) {
case TYPE_BOOLEAN:
return GetOrcPrimitiveLiteral<bool, bool>(*predicate_type, val);
case TYPE_TINYINT:
return GetOrcPrimitiveLiteral<int8_t, int64_t>(*predicate_type, val);
case TYPE_SMALLINT:
return GetOrcPrimitiveLiteral<int16_t, int64_t>(*predicate_type, val);
case TYPE_INT:
return GetOrcPrimitiveLiteral<int32_t, int64_t>(*predicate_type, val);
case TYPE_BIGINT:
return GetOrcPrimitiveLiteral<int64_t, int64_t>(*predicate_type, val);
case TYPE_FLOAT:
return GetOrcPrimitiveLiteral<float, double>(*predicate_type, val);
case TYPE_DOUBLE:
return GetOrcPrimitiveLiteral<double, double>(*predicate_type, val);
// Predicates on Timestamp are currently skipped in FE. We will focus on them in
// IMPALA-10915.
case TYPE_TIMESTAMP: {
DCHECK(false) << "Timestamp predicate is not supported: IMPALA-10915";
return orc::Literal(predicate_type);
}
case TYPE_DATE: {
if (UNLIKELY(!val)) return orc::Literal(predicate_type);
const DateValue* dv = reinterpret_cast<const DateValue*>(val);
int32_t value = 0;
// The date should be valid at this point.
bool success = dv->ToDaysSinceEpoch(&value);
DCHECK(success);
return orc::Literal(*predicate_type, value);
}
case TYPE_STRING: {
if (UNLIKELY(!val)) return orc::Literal(predicate_type);
const StringValue* sv = reinterpret_cast<StringValue*>(val);
return orc::Literal(sv->ptr, sv->len);
}
// Predicates on CHAR/VARCHAR are currently skipped in FE. We will focus on them in
// IMPALA-10882.
case TYPE_VARCHAR: {
DCHECK(false) << "Varchar predicate is not supported: IMPALA-10882";
return orc::Literal(predicate_type);
}
case TYPE_CHAR: {
DCHECK(false) << "Char predicate is not supported: IMPALA-10882";
if (UNLIKELY(!val)) return orc::Literal(predicate_type);
const StringValue* sv = reinterpret_cast<StringValue*>(val);
char* dst_ptr;
if (dst_type.len > sv->len) {
dst_ptr = reinterpret_cast<char*>(search_args_pool_->TryAllocate(dst_type.len));
if (dst_ptr == nullptr) return orc::Literal(predicate_type);
memcpy(dst_ptr, sv->ptr, sv->len);
StringValue::PadWithSpaces(dst_ptr, dst_type.len, sv->len);
} else {
dst_ptr = sv->ptr;
}
return orc::Literal(dst_ptr, sv->len);
}
case TYPE_DECIMAL: {
if (!val) return orc::Literal(predicate_type);
orc::Int128 value;
switch (type.GetByteSize()) {
case 4: {
Decimal4Value* dv4 = reinterpret_cast<Decimal4Value*>(val);
value = orc::Int128(dv4->value());
break;
}
case 8: {
Decimal8Value* dv8 = reinterpret_cast<Decimal8Value*>(val);
value = orc::Int128(dv8->value());
break;
}
case 16: {
Decimal16Value* dv16 = reinterpret_cast<Decimal16Value*>(val);
value = orc::Int128(static_cast<int64_t>(dv16->value() >> 64), // higher bits
static_cast<uint64_t>(dv16->value())); // lower bits
break;
}
default:
DCHECK(false) << "Invalid byte size for decimal type: " << type.GetByteSize();
}
return orc::Literal(value, type.precision, type.scale);
}
default:
DCHECK(false) << "Invalid type";
return orc::Literal(orc::PredicateDataType::BOOLEAN);
}
}
bool HdfsOrcScanner::PrepareBinaryPredicate(const string& fn_name, uint64_t orc_column_id,
const ColumnType& type, ScalarExprEvaluator* eval,
orc::SearchArgumentBuilder* sarg) {
orc::PredicateDataType predicate_type;
orc::Literal literal = GetSearchArgumentLiteral(eval, /*child_idx*/1, type,
&predicate_type);
if (literal.isNull()) {
VLOG_FILE << "Failed to push down predicate " << eval->root().DebugString();
return false;
}
if (fn_name == "lt") {
sarg->lessThan(orc_column_id, predicate_type, literal);
} else if (fn_name == "gt") {
sarg->startNot()
.lessThanEquals(orc_column_id, predicate_type, literal)
.end();
} else if (fn_name == "le") {
sarg->lessThanEquals(orc_column_id, predicate_type, literal);
} else if (fn_name == "ge") {
sarg->startNot()
.lessThan(orc_column_id, predicate_type, literal)
.end();
} else if (fn_name == "eq") {
sarg->equals(orc_column_id, predicate_type, literal);
} else {
return false;
}
return true;
}
bool HdfsOrcScanner::PrepareInListPredicate(uint64_t orc_column_id,
const ColumnType& type, ScalarExprEvaluator* eval,
orc::SearchArgumentBuilder* sarg) {
std::vector<orc::Literal> in_list;
// Initialize 'predicate_type' to avoid clang-tidy warning.
orc::PredicateDataType predicate_type = orc::PredicateDataType::BOOLEAN;
for (int i = 1; i < eval->root().children().size(); ++i) {
// ORC reader only supports pushing down predicates whose constant parts are literal.
// FE shouldn't push down any non-literal expr here.
DCHECK(eval->root().GetChild(i)->IsLiteral())
<< "Non-literal constant expr cannot be used";
in_list.emplace_back(GetSearchArgumentLiteral(eval, i, type, &predicate_type));
}
return PrepareInListPredicate(orc_column_id, type, in_list, sarg);
}
bool HdfsOrcScanner::PrepareInListPredicate(uint64_t orc_column_id,
const ColumnType& type, const std::vector<orc::Literal>& in_list,
orc::SearchArgumentBuilder* sarg) {
orc::PredicateDataType predicate_type = GetOrcPredicateDataType(type);
// The ORC library requires IN-list has at least 2 literals. Converting to EQUALS
// when there is one.
if (in_list.size() == 1) {
sarg->equals(orc_column_id, predicate_type, in_list[0]);
} else if (in_list.size() > 1) {
sarg->in(orc_column_id, predicate_type, in_list);
} else {
DCHECK(false) << "Empty IN-list should cause syntax error";
return false;
}
return true;
}
void HdfsOrcScanner::PrepareIsNullPredicate(bool is_not_null, uint64_t orc_column_id,
const ColumnType& type, orc::SearchArgumentBuilder* sarg) {
orc::PredicateDataType orc_type = GetOrcPredicateDataType(type);
if (is_not_null) {
sarg->startNot()
.isNull(orc_column_id, orc_type)
.end();
} else {
sarg->isNull(orc_column_id, orc_type);
}
}
bool HdfsOrcScanner::ShouldUpdateSearchArgument() {
int num_current_filters = 0;
for (const FilterContext* ctx : filter_ctxs_) {
if (IsPushableInListFilter(ctx->filter)) num_current_filters++;
}
VLOG_FILE << "num_current_filters: " << num_current_filters
<< ", last num_usable_in_list_filters: " << num_pushable_in_list_filters_;
return num_current_filters > num_pushable_in_list_filters_;
}
Status HdfsOrcScanner::PrepareSearchArguments() {
if (!state_->query_options().orc_read_statistics) return Status::OK();
if (!ShouldUpdateSearchArgument()) return Status::OK();
VLOG_FILE << "Building SearchArgument on ORC file " << filename();
const TupleDescriptor* stats_tuple_desc = scan_node_->stats_tuple_desc();
if (!stats_tuple_desc) return Status::OK();
std::unique_ptr<orc::SearchArgumentBuilder> sarg =
orc::SearchArgumentFactory::newBuilder();
bool sargs_supported = false;
const orc::Type* node = nullptr;
bool pos_field;
bool missing_field;
int num_pushed_down_predicates = 0;
DCHECK_GE(stats_tuple_desc->slots().size(), stats_conjunct_evals_.size());
for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
SlotDescriptor* slot_desc = stats_tuple_desc->slots()[i];
// Resolve column path to determine col idx in file schema.
RETURN_IF_ERROR(schema_resolver_->ResolveColumn(slot_desc->col_path(),
&node, &pos_field, &missing_field));
if (pos_field || missing_field) continue;
ScalarExprEvaluator* eval = stats_conjunct_evals_[i];
const string& fn_name = eval->root().function_name();
if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
PrepareIsNullPredicate(fn_name == "is_not_null_pred", node->getColumnId(),
slot_desc->type(), sarg.get());
sargs_supported = true;
num_pushed_down_predicates++;
continue;
}
ScalarExpr* const_expr = eval->root().GetChild(1);
// ORC reader only supports pushing down predicates whose constant parts are literal.
// We could get non-literal expr if expr rewrites are disabled.
if (!const_expr->IsLiteral()) continue;
// TODO: push down stats predicates on CHAR/VARCHAR(IMPALA-10882) and
// TIMESTAMP(IMPALA-10915) to ORC reader
DCHECK(const_expr->type().type != TYPE_CHAR);
DCHECK(const_expr->type().type != TYPE_VARCHAR);
DCHECK(const_expr->type().type != TYPE_TIMESTAMP);
DCHECK(slot_desc->type().type != TYPE_CHAR);
DCHECK(slot_desc->type().type != TYPE_VARCHAR);
DCHECK(slot_desc->type().type != TYPE_TIMESTAMP) << "FE should skip such predicates";
// TODO(IMPALA-10916): dealing with lhs that is a simple cast expr.
if (GetOrcPredicateDataType(slot_desc->type()) !=
GetOrcPredicateDataType(const_expr->type())) {
continue;
}
// Skip if the file schema contains unsupported types.
// TODO: push down stats predicates on CHAR/VARCHAR(IMPALA-10882) and
// TIMESTAMP(IMPALA-10915) to ORC reader
if (node->getKind() == orc::CHAR
|| node->getKind() == orc::VARCHAR
|| node->getKind() == orc::TIMESTAMP) {
continue;
}
bool success;
if (fn_name == "in_iterate" || fn_name == "in_set_lookup") {
success = PrepareInListPredicate(
node->getColumnId(), slot_desc->type(), eval, sarg.get());
if (success) {
sargs_supported = true;
num_pushed_down_predicates++;
}
continue;
}
success = PrepareBinaryPredicate(fn_name, node->getColumnId(),
slot_desc->type(), eval, sarg.get());
if (success) {
sargs_supported = true;
num_pushed_down_predicates++;
}
}
VLOG_FILE << "Pushed " << num_pushed_down_predicates << " predicates down";
COUNTER_SET(num_pushed_down_predicates_counter_, num_pushed_down_predicates);
sargs_supported |= UpdateSearchArgumentWithFilters(sarg.get());
if (sargs_supported) {
try {
std::unique_ptr<orc::SearchArgument> final_sarg = sarg->build();
VLOG_FILE << "Built search arguments for ORC file: " << filename() << ": "
<< final_sarg->toString() << ". File schema: " << reader_->getType().toString();
row_reader_options_.searchArgument(std::move(final_sarg));
} RETURN_ON_ORC_EXCEPTION(
"Encountered parse error during building search arguments in ORC file $0: $1");
}
// Free any expr result allocations accumulated during conjunct evaluation.
context_->expr_results_pool()->Clear();
return Status::OK();
}
bool HdfsOrcScanner::IsPushableInListFilter(const RuntimeFilter* filter) {
VLOG_FILE << "Checking readiness";
if (!filter || !filter->is_in_list_filter() || !filter->HasFilter()) return false;
VLOG_FILE << "Checking partition filters";
// Only apply runtime filters on non-partition columns.
if (filter->IsBoundByPartitionColumn(GetScanNodeId())) return false;
VLOG_FILE << "Checking always_true of filter " << filter->id();
InListFilter* in_list_filter = filter->get_in_list_filter();
if (in_list_filter->AlwaysTrue()) return false;
VLOG_FILE << "Checking target expr of filter " << filter->id();
const TRuntimeFilterTargetDesc& target_desc = filter->filter_desc().targets[0];
// Filters target on an expr (e.g. 100 * col) can't be simply pushed down.
if (target_desc.target_expr.nodes.size() != 1) return false;
if (!target_desc.target_expr.nodes[0].__isset.slot_ref) return false;
return true;
}
bool HdfsOrcScanner::UpdateSearchArgumentWithFilters(orc::SearchArgumentBuilder* sarg) {
VLOG_FILE << "Updating SearchArgument with runtime filters";
int num_usable_filters = 0;
int num_pushed_down_filters = 0;
for (const FilterContext* ctx : filter_ctxs_) {
const RuntimeFilter* filter = ctx->filter;
if (!IsPushableInListFilter(filter)) continue;
num_usable_filters++;
VLOG_FILE << "Filter " << filter->id() << " is usable. "
<< "Resolving filter target in ORC file " << filename();
InListFilter* in_list_filter = filter->get_in_list_filter();
const TRuntimeFilterTargetDesc& target_desc = filter->filter_desc().targets[0];
DCHECK_EQ(target_desc.target_expr_slotids.size(), 1);
TSlotId sid = target_desc.target_expr_slotids[0];
const SlotDescriptor* target_slot = nullptr;
for (const SlotDescriptor* slot : scan_node_->tuple_desc()->slots()) {
if (slot->id() == sid) {
target_slot = slot;
break;
}
}
if (target_slot == nullptr) {
VLOG_FILE << "Can't find slot of id=" << sid << " in "
<< scan_node_->tuple_desc()->DebugString();
continue;
}
const orc::Type* node = nullptr;
bool pos_field;
bool missing_field;
Status s = schema_resolver_->ResolveColumn(target_slot->col_path(),
&node, &pos_field, &missing_field);
if (!s.ok()) {
VLOG_FILE << "Can't resolve " << target_slot->DebugString() << " in ORC file "
<< filename();
continue;
}
if (pos_field || missing_field) continue;
VLOG_FILE << "Generating ORC IN-list for filter " << filter->id();
std::vector<orc::Literal> in_list;
in_list_filter->ToOrcLiteralList(&in_list);
const ColumnType& col_type = filter->type();
if (in_list_filter->ContainsNull()) {
// Add a null literal with type.
in_list.emplace_back(GetOrcPredicateDataType(col_type));
}
if (!in_list.empty()) {
VLOG_FILE << "Updated sarg with " << in_list.size() << " items for filter "
<< filter->id();
if (PrepareInListPredicate(node->getColumnId(), col_type, in_list, sarg))
num_pushed_down_filters++;
}
}
num_pushable_in_list_filters_ = num_usable_filters;
COUNTER_SET(num_pushed_down_runtime_filters_counter_, num_pushed_down_filters);
VLOG_FILE << num_usable_filters << " usable filters. Pushed " << num_pushed_down_filters
<< " filters down.";
return num_pushed_down_filters > 0;
}
Status HdfsOrcScanner::ReadFooterStream(void* buf, uint64_t length, uint64_t offset) {
Status status;
if (offset > stream_->file_offset()) {
// skip the non-requested range
uint64_t bytes_to_skip = offset - stream_->file_offset();
if (!stream_->SkipBytes(bytes_to_skip, &status)) {
LOG(ERROR) << Substitute("HdfsOrcScanner::ReadFooterStream skipping failed. "
"offset: $0 length: $1 current_offset: $2",
offset, length, stream_->file_offset());
return status;
}
AddSkippedReadBytesCounter(bytes_to_skip);
}
uint8_t* stream_buf = nullptr;
{
SCOPED_TIMER2(state_->total_storage_wait_timer(), scan_node_->scanner_io_wait_time());
if (!stream_->ReadBytes(length, &stream_buf, &status, false)) return status;
AddAsyncReadBytesCounter(length);
}
CHECK_NOTNULL(stream_buf);
memcpy(buf, stream_buf, length); // TODO: ORC-262: extend ORC interface to avoid copy.
bool done = stream_->bytes_left() == 0;
stream_->ReleaseCompletedResources(done);
return Status::OK();
}
}