blob: 345a140c45c4dfd22c8b2bea403c2b08514f7447 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-scanner.h"
#include "codegen/codegen-anyval.h"
#include "exec/base-sequence-scanner.h"
#include "exec/text-converter.h"
#include "exec/hdfs-scan-node.h"
#include "exec/hdfs-scan-node-mt.h"
#include "exec/read-write-util.h"
#include "exec/text-converter.inline.h"
#include "runtime/collection-value-builder.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/tuple-row.h"
#include "util/bitmap.h"
#include "util/codec.h"
#include "util/test-info.h"
#include "common/names.h"
using namespace impala;
using namespace impala::io;
using namespace strings;
DEFINE_double(min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
"rows rejected by a runtime filter drops below this value, the filter is disabled.");
const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation";
const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
const int64_t HdfsScanner::FOOTER_SIZE;
HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: scan_node_(scan_node),
state_(state),
expr_perm_pool_(new MemPool(scan_node->expr_mem_tracker())),
template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
data_buffer_pool_(new MemPool(scan_node->mem_tracker())) {
DCHECK_EQ(1, scan_node->row_desc()->tuple_descriptors().size())
<< "All HDFS scanners assume one tuple per row";
for (SlotDescriptor* string_slot : scan_node_->tuple_desc()->string_slots()) {
string_slot_offsets_.push_back(
{string_slot->null_indicator_offset(), string_slot->tuple_offset()});
}
}
HdfsScanner::HdfsScanner()
: scan_node_(nullptr),
state_(nullptr),
tuple_byte_size_(0) {
DCHECK(TestInfo::is_test());
}
HdfsScanner::~HdfsScanner() {
}
Status HdfsScanner::Open(ScannerContext* context) {
context_ = context;
stream_ = context->GetStream();
// Clone the scan node's conjuncts map. The cloned evaluators must be closed by the
// caller.
for (const auto& entry: scan_node_->conjuncts_map()) {
RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, scan_node_->runtime_state(),
expr_perm_pool_.get(), context_->expr_results_pool(), entry.second,
&conjunct_evals_map_[entry.first]));
}
DCHECK(conjunct_evals_map_.find(scan_node_->tuple_desc()->id()) !=
conjunct_evals_map_.end());
conjunct_evals_ = &conjunct_evals_map_[scan_node_->tuple_desc()->id()];
// Set up the scan node's dictionary filtering conjuncts map.
if (scan_node_->thrift_dict_filter_conjuncts_map() != nullptr) {
for (auto& entry : *(scan_node_->thrift_dict_filter_conjuncts_map())) {
SlotDescriptor* slot_desc = state_->desc_tbl().GetSlotDescriptor(entry.first);
TupleId tuple_id = (slot_desc->type().IsCollectionType() ?
slot_desc->collection_item_descriptor()->id() :
slot_desc->parent()->id());
auto conjunct_evals_it = conjunct_evals_map_.find(tuple_id);
DCHECK(conjunct_evals_it != conjunct_evals_map_.end());
const vector<ScalarExprEvaluator*>& conjunct_evals = conjunct_evals_it->second;
// Convert this slot's list of conjunct indices into a list of pointers
// into conjunct_evals_.
for (int conjunct_idx : entry.second) {
DCHECK_LT(conjunct_idx, conjunct_evals.size());
DCHECK((conjunct_evals)[conjunct_idx] != nullptr);
dict_filter_map_[entry.first].push_back((conjunct_evals)[conjunct_idx]);
}
}
}
// Initialize the template_tuple_.
template_tuple_ = scan_node_->InitTemplateTuple(
context_->partition_descriptor()->partition_key_value_evals(),
template_tuple_pool_.get(), state_);
template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_;
decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime");
return Status::OK();
}
Status HdfsScanner::ProcessSplit() {
DCHECK(scan_node_->HasRowBatchQueue());
HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
bool returned_rows = false;
do {
// IMPALA-3798, IMPALA-3804: For sequence-based files, the filters are only
// applied in HdfsScanNode::ProcessSplit()
bool is_sequence_based = BaseSequenceScanner::FileFormatIsSequenceBased(
context_->partition_descriptor()->file_format());
if (!is_sequence_based && FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY,
context_->filter_ctxs())) {
eos_ = true;
break;
}
unique_ptr<RowBatch> batch = std::make_unique<RowBatch>(scan_node_->row_desc(),
state_->batch_size(), scan_node_->mem_tracker());
Status status = GetNextInternal(batch.get());
if (batch->num_rows() > 0) returned_rows = true;
// Always add batch to the queue if any rows were returned because it may contain
// data referenced by previously appended batches.
if (returned_rows) scan_node->AddMaterializedRowBatch(move(batch));
RETURN_IF_ERROR(status);
} while (!eos_ && !scan_node_->ReachedLimitShared());
return Status::OK();
}
void HdfsScanner::Close() {
DCHECK(scan_node_->HasRowBatchQueue());
RowBatch* final_batch = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
scan_node_->mem_tracker());
Close(final_batch);
}
void HdfsScanner::CloseInternal() {
DCHECK(!is_closed_);
if (decompressor_.get() != nullptr) {
decompressor_->Close();
decompressor_.reset();
}
for (auto& entry : conjunct_evals_map_) {
ScalarExprEvaluator::Close(entry.second, state_);
}
expr_perm_pool_->FreeAll();
context_->expr_results_pool()->FreeAll();
obj_pool_.Clear();
stream_ = nullptr;
context_->ClearStreams();
is_closed_ = true;
}
Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
THdfsFileFormat::type type, const string& scanner_name) {
if (!scan_node_->tuple_desc()->string_slots().empty()
&& partition->escape_char() != '\0') {
// Codegen currently doesn't emit call to MemPool::TryAllocate() so skip codegen if
// there are strings slots and we need to compact (i.e. copy) the data.
scan_node_->IncNumScannersCodegenDisabled();
return Status::OK();
}
write_tuples_fn_ = reinterpret_cast<WriteTuplesFn>(scan_node_->GetCodegenFn(type));
if (write_tuples_fn_ == NULL) {
scan_node_->IncNumScannersCodegenDisabled();
return Status::OK();
}
VLOG(2) << scanner_name << "(node_id=" << scan_node_->id()
<< ") using llvm codegend functions.";
scan_node_->IncNumScannersCodegenEnabled();
return Status::OK();
}
Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool,
Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) {
int num_tuples;
*pool = builder->pool();
RETURN_IF_ERROR(builder->GetFreeMemory(tuple_mem, &num_tuples));
// Treat tuple as a single-tuple row
*tuple_row_mem = reinterpret_cast<TupleRow*>(tuple_mem);
*num_rows = num_tuples;
return Status::OK();
}
Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
DCHECK_LE(num_rows, row_batch->capacity() - row_batch->num_rows());
row_batch->CommitRows(num_rows);
tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
if (context_->cancelled()) return Status::CancelledInternal("HDFS scanner");
// Check for UDF errors.
RETURN_IF_ERROR(state_->GetQueryStatus());
// Clear expr result allocations for this thread to avoid accumulating too much
// memory from evaluating the scanner conjuncts.
context_->expr_results_pool()->Clear();
return Status::OK();
}
int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
DCHECK_GE(num_tuples, 0);
DCHECK_EQ(scan_node_->tuple_idx(), 0);
DCHECK_EQ(scan_node_->materialized_slots().size(), 0);
int num_to_commit = 0;
if (LIKELY(conjunct_evals_->size() == 0)) {
num_to_commit = num_tuples;
} else {
TupleRow template_tuple_row;
template_tuple_row.SetTuple(0, template_tuple_);
// Evaluate any conjuncts which may reference the partition columns.
for (int i = 0; i < num_tuples; ++i) {
if (EvalConjuncts(&template_tuple_row)) ++num_to_commit;
}
}
Tuple** row_tuple = reinterpret_cast<Tuple**>(row);
if (template_tuple_ != nullptr) {
for (int i = 0; i < num_to_commit; ++i) row_tuple[i] = template_tuple_;
} else {
DCHECK_EQ(scan_node_->tuple_desc()->byte_size(), 0);
// IMPALA-6258: Initialize tuple ptrs to non-null value
for (int i = 0; i < num_to_commit; ++i) row_tuple[i] = Tuple::POISON;
}
return num_to_commit;
}
bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
Tuple* tuple, TupleRow* tuple_row, Tuple* template_tuple,
uint8_t* error_fields, uint8_t* error_in_row) {
*error_in_row = false;
// Initialize tuple before materializing slots
InitTuple(template_tuple, tuple);
for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) {
int need_escape = false;
int len = fields[i].len;
if (UNLIKELY(len < 0)) {
len = -len;
need_escape = true;
}
SlotDescriptor* desc = scan_node_->materialized_slots()[i];
bool error = !text_converter_->WriteSlot(desc, tuple,
fields[i].start, len, false, need_escape, pool);
error_fields[i] = error;
*error_in_row |= error;
}
tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
return EvalConjuncts(tuple_row);
}
// Codegen for WriteTuple(above) for writing out single nullable string slot and
// evaluating a <slot> = <constantexpr> conjunct. The signature matches WriteTuple()
// except for the first this* argument.
// define i1 @WriteCompleteTuple(%"class.impala::HdfsScanner"* %this,
// %"class.impala::MemPool"* %pool,
// %"struct.impala::FieldLocation"* %fields,
// %"class.impala::Tuple"* %tuple,
// %"class.impala::TupleRow"* %tuple_row,
// %"class.impala::Tuple"* %template,
// i8* %error_fields, i8* %error_in_row) {
// entry:
// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple
// to <{ %"struct.impala::StringValue", i8 }>*
// %tuple_ptr1 = bitcast %"class.impala::Tuple"* %template
// to <{ %"struct.impala::StringValue", i8 }>*
// %int8_ptr = bitcast <{ %"struct.impala::StringValue", i8 }>* %tuple_ptr to i8*
// %null_bytes_ptr = getelementptr i8, i8* %int8_ptr, i32 16
// call void @llvm.memset.p0i8.i64(i8* %null_bytes_ptr, i8 0, i64 1, i32 0, i1 false)
// %0 = bitcast %"class.impala::TupleRow"* %tuple_row
// to <{ %"struct.impala::StringValue", i8 }>**
// %1 = getelementptr <{ %"struct.impala::StringValue", i8 }>*,
// <{ %"struct.impala::StringValue", i8 }>** %0, i32 0
// store <{ %"struct.impala::StringValue", i8 }>* %tuple_ptr,
// <{ %"struct.impala::StringValue", i8 }>** %1
// br label %parse
//
// parse: ; preds = %entry
// %data_ptr = getelementptr %"struct.impala::FieldLocation",
// %"struct.impala::FieldLocation"* %fields, i32 0, i32 0
// %len_ptr = getelementptr %"struct.impala::FieldLocation",
// %"struct.impala::FieldLocation"* %fields, i32 0, i32 1
// %slot_error_ptr = getelementptr i8, i8* %error_fields, i32 0
// %data = load i8*, i8** %data_ptr
// %len = load i32, i32* %len_ptr
// %2 = call i1 @WriteSlot(<{ %"struct.impala::StringValue", i8 }>* %tuple_ptr,
// i8* %data, i32 %len)
// %slot_parse_error = xor i1 %2, true
// %error_in_row2 = or i1 false, %slot_parse_error
// %3 = zext i1 %slot_parse_error to i8
// store i8 %3, i8* %slot_error_ptr
// %4 = call %"class.impala::ScalarExprEvaluator"* @GetConjunctCtx(
// %"class.impala::HdfsScanner"* %this, i32 0)
// %conjunct_eval = call i16 @"impala::Operators::Eq_StringVal_StringValWrapper"(
// %"class.impala::ScalarExprEvaluator"* %4, %"class.impala::TupleRow"* %tuple_row)
// %5 = ashr i16 %conjunct_eval, 8
// %6 = trunc i16 %5 to i8
// %val = trunc i8 %6 to i1
// br i1 %val, label %parse3, label %eval_fail
//
// parse3: ; preds = %parse
// %7 = zext i1 %error_in_row2 to i8
// store i8 %7, i8* %error_in_row
// ret i1 true
//
// eval_fail: ; preds = %parse
// ret i1 false
// }
Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts,
llvm::Function** write_complete_tuple_fn) {
*write_complete_tuple_fn = NULL;
RuntimeState* state = node->runtime_state();
// Cast away const-ness. The codegen only sets the cached typed llvm struct.
TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
vector<llvm::Function*> slot_fns;
for (int i = 0; i < node->materialized_slots().size(); ++i) {
llvm::Function* fn = nullptr;
SlotDescriptor* slot_desc = node->materialized_slots()[i];
RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, &fn,
node->hdfs_table()->null_column_value().data(),
node->hdfs_table()->null_column_value().size(), true, state->strict_mode()));
if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn);
slot_fns.push_back(fn);
}
// Compute order to materialize slots. BE assumes that conjuncts should
// be evaluated in the order specified (optimization is already done by FE)
vector<int> materialize_order;
node->ComputeSlotMaterializationOrder(&materialize_order);
// Get types to construct matching function signature to WriteCompleteTuple
llvm::PointerType* uint8_ptr_type = codegen->i8_ptr_type();
llvm::PointerType* field_loc_ptr_type = codegen->GetStructPtrType<FieldLocation>();
llvm::PointerType* tuple_opaque_ptr_type = codegen->GetStructPtrType<Tuple>();
llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
llvm::PointerType* mem_pool_ptr_type = codegen->GetStructPtrType<MemPool>();
llvm::PointerType* hdfs_scanner_ptr_type = codegen->GetStructPtrType<HdfsScanner>();
// Generate the typed llvm struct for the output tuple
llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
if (tuple_type == NULL) return Status("Could not generate tuple struct.");
llvm::PointerType* tuple_ptr_type = llvm::PointerType::get(tuple_type, 0);
// Initialize the function prototype. This needs to match
// HdfsScanner::WriteCompleteTuple's signature identically.
LlvmCodeGen::FnPrototype prototype(
codegen, "WriteCompleteTuple", codegen->bool_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this", hdfs_scanner_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mem_pool_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("fields", field_loc_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("template", tuple_opaque_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("error_fields", uint8_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("error_in_row", uint8_ptr_type));
llvm::LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
llvm::Value* args[8];
llvm::Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
llvm::BasicBlock* parse_block = llvm::BasicBlock::Create(context, "parse", fn);
llvm::BasicBlock* eval_fail_block = llvm::BasicBlock::Create(context, "eval_fail", fn);
// Extract the input args
llvm::Value* this_arg = args[0];
llvm::Value* fields_arg = args[2];
llvm::Value* opaque_tuple_arg = args[3];
llvm::Value* tuple_arg =
builder.CreateBitCast(opaque_tuple_arg, tuple_ptr_type, "tuple_ptr");
llvm::Value* tuple_row_arg = args[4];
llvm::Value* opaque_template_arg = args[5];
llvm::Value* errors_arg = args[6];
llvm::Value* error_in_row_arg = args[7];
// Codegen for function body
llvm::Value* error_in_row = codegen->false_value();
llvm::Function* init_tuple_fn;
RETURN_IF_ERROR(CodegenInitTuple(node, codegen, &init_tuple_fn));
builder.CreateCall(init_tuple_fn, {this_arg, opaque_template_arg, opaque_tuple_arg});
// Put tuple in tuple_row
llvm::Value* tuple_row_typed =
builder.CreateBitCast(tuple_row_arg, llvm::PointerType::get(tuple_ptr_type, 0));
llvm::Value* tuple_row_idxs[] = {codegen->GetI32Constant(node->tuple_idx())};
llvm::Value* tuple_in_row_addr =
builder.CreateInBoundsGEP(tuple_row_typed, tuple_row_idxs);
builder.CreateStore(tuple_arg, tuple_in_row_addr);
builder.CreateBr(parse_block);
// Loop through all the conjuncts in order and materialize slots as necessary to
// evaluate the conjuncts (e.g. conjuncts[0] will have the slots it references
// first).
// materialized_order[slot_idx] represents the first conjunct which needs that slot.
// Slots are only materialized if its order matches the current conjunct being
// processed. This guarantees that each slot is materialized once when it is first
// needed and that at the end of the materialize loop, the conjunct has everything
// it needs (either from this iteration or previous iterations).
builder.SetInsertPoint(parse_block);
for (int conjunct_idx = 0; conjunct_idx <= conjuncts.size(); ++conjunct_idx) {
for (int slot_idx = 0; slot_idx < materialize_order.size(); ++slot_idx) {
// If they don't match, it means either the slot has already been
// materialized for a previous conjunct or will be materialized later for
// another conjunct. Either case, the slot does not need to be materialized
// yet.
if (materialize_order[slot_idx] != conjunct_idx) continue;
// Materialize slots[slot_idx] to evaluate conjuncts[conjunct_idx]
// All slots[i] with materialized_order[i] < conjunct_idx have already been
// materialized by prior iterations through the outer loop
// Extract ptr/len from fields
llvm::Value* data_idxs[] = {
codegen->GetI32Constant(slot_idx),
codegen->GetI32Constant(0),
};
llvm::Value* len_idxs[] = {
codegen->GetI32Constant(slot_idx),
codegen->GetI32Constant(1),
};
llvm::Value* error_idxs[] = {
codegen->GetI32Constant(slot_idx),
};
llvm::Value* data_ptr =
builder.CreateInBoundsGEP(fields_arg, data_idxs, "data_ptr");
llvm::Value* len_ptr = builder.CreateInBoundsGEP(fields_arg, len_idxs, "len_ptr");
llvm::Value* error_ptr =
builder.CreateInBoundsGEP(errors_arg, error_idxs, "slot_error_ptr");
llvm::Value* data = builder.CreateLoad(data_ptr, "data");
llvm::Value* len = builder.CreateLoad(len_ptr, "len");
// Convert length to positive if it is negative. Negative lengths are assigned to
// slots that contain escape characters.
// TODO: CodegenWriteSlot() currently does not handle text that requres unescaping.
// However, if it is modified to handle that case, we need to detect it here and
// send a 'need_escape' bool to CodegenWriteSlot(), since we are making the length
// positive here.
llvm::Value* len_lt_zero =
builder.CreateICmpSLT(len, codegen->GetI32Constant(0), "len_lt_zero");
llvm::Value* ones_compliment_len = builder.CreateNot(len, "ones_compliment_len");
llvm::Value* positive_len = builder.CreateAdd(
ones_compliment_len, codegen->GetI32Constant(1), "positive_len");
len = builder.CreateSelect(len_lt_zero, positive_len, len,
"select_positive_len");
// Call slot parse function
llvm::Function* slot_fn = slot_fns[slot_idx];
llvm::Value* slot_parsed = builder.CreateCall(
slot_fn, llvm::ArrayRef<llvm::Value*>({tuple_arg, data, len}));
llvm::Value* slot_error = builder.CreateNot(slot_parsed, "slot_parse_error");
error_in_row = builder.CreateOr(error_in_row, slot_error, "error_in_row");
slot_error = builder.CreateZExt(slot_error, codegen->i8_type());
builder.CreateStore(slot_error, error_ptr);
}
if (conjunct_idx == conjuncts.size()) {
// In this branch, we've just materialized slots not referenced by any conjunct.
// This slots are the last to get materialized. If we are in this branch, the
// tuple passed all conjuncts and should be added to the row batch.
llvm::Value* error_ret =
builder.CreateZExt(error_in_row, codegen->i8_type());
builder.CreateStore(error_ret, error_in_row_arg);
builder.CreateRet(codegen->true_value());
} else {
// All slots for conjuncts[conjunct_idx] are materialized, evaluate the partial
// tuple against that conjunct and start a new parse_block for the next conjunct
parse_block = llvm::BasicBlock::Create(context, "parse", fn, eval_fail_block);
llvm::Function* conjunct_fn;
Status status =
conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, false, &conjunct_fn);
if (!status.ok()) {
stringstream ss;
ss << "Failed to codegen conjunct: " << status.GetDetail();
state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
return status;
}
if (node->materialized_slots().size() + conjunct_idx
>= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
codegen->SetNoInline(conjunct_fn);
}
llvm::Function* get_eval_fn =
codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_EVALUATOR, false);
llvm::Value* eval = builder.CreateCall(
get_eval_fn, llvm::ArrayRef<llvm::Value*>(
{this_arg, codegen->GetI32Constant(conjunct_idx)}));
llvm::Value* conjunct_args[] = {eval, tuple_row_arg};
CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
codegen, &builder, TYPE_BOOLEAN, conjunct_fn, conjunct_args, "conjunct_eval");
builder.CreateCondBr(result.GetVal(), parse_block, eval_fail_block);
builder.SetInsertPoint(parse_block);
}
}
// Block if eval failed.
builder.SetInsertPoint(eval_fail_block);
builder.CreateRet(codegen->false_value());
if (node->materialized_slots().size() + conjuncts.size()
> LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
codegen->SetNoInline(fn);
}
*write_complete_tuple_fn = codegen->FinalizeFunction(fn);
if (*write_complete_tuple_fn == NULL) {
return Status("Failed to finalize write_complete_tuple_fn.");
}
return Status::OK();
}
Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
LlvmCodeGen* codegen, llvm::Function* write_complete_tuple_fn,
llvm::Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = NULL;
DCHECK(write_complete_tuple_fn != NULL);
llvm::Function* write_tuples_fn =
codegen->GetFunction(IRFunction::HDFS_SCANNER_WRITE_ALIGNED_TUPLES, true);
DCHECK(write_tuples_fn != NULL);
int replaced = codegen->ReplaceCallSites(write_tuples_fn, write_complete_tuple_fn,
"WriteCompleteTuple");
DCHECK_REPLACE_COUNT(replaced, 1);
llvm::Function* copy_strings_fn;
RETURN_IF_ERROR(Tuple::CodegenCopyStrings(
codegen, *node->tuple_desc(), &copy_strings_fn));
replaced = codegen->ReplaceCallSites(
write_tuples_fn, copy_strings_fn, "CopyStrings");
DCHECK_REPLACE_COUNT(replaced, 1);
int tuple_byte_size = node->tuple_desc()->byte_size();
replaced = codegen->ReplaceCallSitesWithValue(write_tuples_fn,
codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
DCHECK_REPLACE_COUNT(replaced, 1);
*write_aligned_tuples_fn = codegen->FinalizeFunction(write_tuples_fn);
if (*write_aligned_tuples_fn == NULL) {
return Status("Failed to finalize write_aligned_tuples_fn.");
}
return Status::OK();
}
Status HdfsScanner::CodegenInitTuple(
const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn) {
*init_tuple_fn = codegen->GetFunction(IRFunction::HDFS_SCANNER_INIT_TUPLE, true);
DCHECK(*init_tuple_fn != nullptr);
// Replace all of the constants in InitTuple() to specialize the code.
int replaced = codegen->ReplaceCallSitesWithBoolConst(
*init_tuple_fn, node->num_materialized_partition_keys() > 0, "has_template_tuple");
DCHECK_REPLACE_COUNT(replaced, 1);
const TupleDescriptor* tuple_desc = node->tuple_desc();
replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
codegen->GetI32Constant(tuple_desc->byte_size()), "tuple_byte_size");
DCHECK_REPLACE_COUNT(replaced, 1);
replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
codegen->GetI32Constant(tuple_desc->null_bytes_offset()),
"null_bytes_offset");
DCHECK_REPLACE_COUNT(replaced, 1);
replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
codegen->GetI32Constant(tuple_desc->num_null_bytes()), "num_null_bytes");
DCHECK_REPLACE_COUNT(replaced, 1);
*init_tuple_fn = codegen->FinalizeFunction(*init_tuple_fn);
if (*init_tuple_fn == nullptr) {
return Status("Failed to finalize codegen'd InitTuple().");
}
return Status::OK();
}
// ; Function Attrs: noinline
// define i1 @EvalRuntimeFilters(%"class.impala::HdfsScanner"* %this,
// %"class.impala::TupleRow"* %row) #34 {
// entry:
// %0 = call i1 @_ZN6impala11HdfsScanner17EvalRuntimeFilterEiPNS_8TupleRowE.2(
// %"class.impala::HdfsScanner"* %this, i32 0, %"class.impala::TupleRow"*
// %row)
// br i1 %0, label %continue, label %bail_out
//
// bail_out: ; preds = %entry
// ret i1 false
//
// continue: ; preds = %entry
// ret i1 true
// }
//
// EvalRuntimeFilter() is the same as the cross-compiled version except EvalOneFilter()
// is replaced with the one generated by CodegenEvalOneFilter().
Status HdfsScanner::CodegenEvalRuntimeFilters(
LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn) {
llvm::LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
*fn = nullptr;
llvm::Type* this_type = codegen->GetStructPtrType<HdfsScanner>();
llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters",
codegen->bool_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
llvm::Value* args[2];
llvm::Function* eval_runtime_filters_fn = prototype.GeneratePrototype(&builder, args);
llvm::Value* this_arg = args[0];
llvm::Value* row_arg = args[1];
int num_filters = filter_exprs.size();
if (num_filters == 0) {
builder.CreateRet(codegen->true_value());
} else {
// row_rejected_block: jump target for when a filter is evaluated to false.
llvm::BasicBlock* row_rejected_block =
llvm::BasicBlock::Create(context, "row_rejected", eval_runtime_filters_fn);
DCHECK_GT(num_filters, 0);
for (int i = 0; i < num_filters; ++i) {
llvm::Function* eval_runtime_filter_fn =
codegen->GetFunction(IRFunction::HDFS_SCANNER_EVAL_RUNTIME_FILTER, true);
DCHECK(eval_runtime_filter_fn != nullptr);
// Codegen function for inlining filter's expression evaluation and constant fold
// the type of the expression into the hashing function to avoid branches.
llvm::Function* eval_one_filter_fn;
DCHECK(filter_exprs[i] != nullptr);
RETURN_IF_ERROR(FilterContext::CodegenEval(codegen, filter_exprs[i],
&eval_one_filter_fn));
DCHECK(eval_one_filter_fn != nullptr);
int replaced = codegen->ReplaceCallSites(eval_runtime_filter_fn, eval_one_filter_fn,
"FilterContext4Eval");
DCHECK_REPLACE_COUNT(replaced, 1);
llvm::Value* idx = codegen->GetI32Constant(i);
llvm::Value* passed_filter = builder.CreateCall(
eval_runtime_filter_fn, llvm::ArrayRef<llvm::Value*>({this_arg, idx, row_arg}));
llvm::BasicBlock* continue_block =
llvm::BasicBlock::Create(context, "continue", eval_runtime_filters_fn);
builder.CreateCondBr(passed_filter, continue_block, row_rejected_block);
builder.SetInsertPoint(continue_block);
}
builder.CreateRet(codegen->true_value());
builder.SetInsertPoint(row_rejected_block);
builder.CreateRet(codegen->false_value());
// Don't inline this function to avoid code bloat in ProcessScratchBatch().
// If there is any filter, EvalRuntimeFilters() is large enough to not benefit
// much from inlining.
eval_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline);
}
*fn = codegen->FinalizeFunction(eval_runtime_filters_fn);
if (*fn == nullptr) {
return Status("Codegen'd HdfsScanner::EvalRuntimeFilters() failed "
"verification, see log");
}
return Status::OK();
}
Status HdfsScanner::UpdateDecompressor(const THdfsCompression::type& compression) {
// Check whether the file in the stream has different compression from the last one.
if (compression != decompression_type_) {
if (decompression_type_ != THdfsCompression::NONE) {
// Close the previous decompressor before creating a new one.
DCHECK(decompressor_.get() != NULL);
decompressor_->Close();
decompressor_.reset(NULL);
}
// The LZO-compression scanner is implemented in a dynamically linked library and it
// is not created at Codec::CreateDecompressor().
if (compression != THdfsCompression::NONE && compression != THdfsCompression::LZO) {
RETURN_IF_ERROR(Codec::CreateDecompressor(data_buffer_pool_.get(),
scan_node_->tuple_desc()->string_slots().empty(), compression, &decompressor_));
}
decompression_type_ = compression;
}
return Status::OK();
}
Status HdfsScanner::UpdateDecompressor(const string& codec) {
map<const string, const THdfsCompression::type>::const_iterator
type = Codec::CODEC_MAP.find(codec);
if (type == Codec::CODEC_MAP.end()) {
stringstream ss;
ss << Codec::UNKNOWN_CODEC_ERROR << codec;
return Status(ss.str());
}
RETURN_IF_ERROR(UpdateDecompressor(type->second));
return Status::OK();
}
bool HdfsScanner::ReportTupleParseError(FieldLocation* fields, uint8_t* errors) {
for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) {
if (errors[i]) {
const SlotDescriptor* desc = scan_node_->materialized_slots()[i];
ReportColumnParseError(desc, fields[i].start, fields[i].len);
errors[i] = false;
}
}
LogRowParseError();
if (state_->abort_on_error()) DCHECK(!parse_status_.ok());
return parse_status_.ok();
}
void HdfsScanner::LogRowParseError() {
const string& s = Substitute("Error parsing row: file: $0, before offset: $1",
stream_->filename(), stream_->file_offset());
state_->LogError(ErrorMsg(TErrorCode::GENERAL, s));
}
void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc,
const char* data, int len) {
if (state_->LogHasSpace() || state_->abort_on_error()) {
stringstream ss;
ss << "Error converting column: "
<< desc->col_pos() - scan_node_->num_partition_keys()
<< " to " << desc->type();
// When skipping multiple header lines we only try to skip them in the first scan
// range. For subsequent scan ranges, it's impossible to determine how many lines
// precede it and whether any header lines should be skipped. If the header does not
// fit into the first scan range and spills into subsequent scan ranges, then we will
// try to parse header data here and fail. The scanner of the first scan range will
// fail the query if it cannot fully skip the header. However, if abort_on_error is
// set, then a race happens between the first scanner to detect the condition and any
// other scanner that tries to parse an invalid value. Therefore a possible mitigation
// is to increase the max_scan_range_length so the header is fully contained in the
// first scan range.
if (scan_node_->skip_header_line_count() > 1) {
ss << "\n" << "Table has skip.header.line.count set to a value > 1. If the data "
<< "that could not be parsed looks like it's part of the file's header, then "
<< "try increasing max_scan_range_length to a value larger than the size of the "
<< "file's header.";
}
if (state_->LogHasSpace()) {
state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()), 2);
}
if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(ss.str());
}
}
void HdfsScanner::CheckFiltersEffectiveness() {
for (int i = 0; i < filter_stats_.size(); ++i) {
LocalFilterStats* stats = &filter_stats_[i];
const RuntimeFilter* filter = filter_ctxs_[i]->filter;
double reject_ratio = stats->rejected / static_cast<double>(stats->considered);
if (filter->AlwaysTrue() ||
reject_ratio < FLAGS_min_filter_reject_ratio) {
stats->enabled = 0;
}
}
}
Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
const THdfsFileFormat::type& file_type, const vector<HdfsFileDesc*>& files) {
DCHECK(!files.empty());
vector<ScanRange*> footer_ranges;
for (int i = 0; i < files.size(); ++i) {
// Compute the offset of the file footer.
int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
int64_t footer_start = files[i]->file_length - footer_size;
DCHECK_GE(footer_start, 0);
// Try to find the split with the footer.
ScanRange* footer_split = FindFooterSplit(files[i]);
for (int j = 0; j < files[i]->splits.size(); ++j) {
ScanRange* split = files[i]->splits[j];
DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
// If there are no materialized slots (such as count(*) over the table), we can
// get the result with the file metadata alone and don't need to read any row
// groups. We only want a single node to process the file footer in this case,
// which is the node with the footer split. If it's not a count(*), we create a
// footer range for the split always.
if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
ScanRangeMetadata* split_metadata =
static_cast<ScanRangeMetadata*>(split->meta_data());
// Each split is processed by first issuing a scan range for the file footer, which
// is done here, followed by scan ranges for the columns of each row group within
// the actual split (in InitColumns()). The original split is stored in the
// metadata associated with the footer range.
ScanRange* footer_range;
if (footer_split != nullptr) {
footer_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), footer_size, footer_start,
split_metadata->partition_id, footer_split->disk_id(),
footer_split->expected_local(), files[i]->is_erasure_coded, files[i]->mtime,
BufferOpts(footer_split->cache_options()), split);
} else {
// If we did not find the last split, we know it is going to be a remote read.
bool expected_local = false;
int cache_options = !scan_node->IsDataCacheDisabled() ?
BufferOpts::USE_DATA_CACHE : BufferOpts::NO_CACHING;
footer_range =
scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
footer_size, footer_start, split_metadata->partition_id, -1,
expected_local, files[i]->is_erasure_coded, files[i]->mtime,
BufferOpts(cache_options), split);
}
footer_ranges.push_back(footer_range);
} else {
scan_node->RangeComplete(file_type, THdfsCompression::NONE);
}
}
}
// The threads that process the footer will also do the scan.
if (footer_ranges.size() > 0) {
RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, EnqueueLocation::TAIL));
}
return Status::OK();
}
ScanRange* HdfsScanner::FindFooterSplit(HdfsFileDesc* file) {
DCHECK(file != nullptr);
for (int i = 0; i < file->splits.size(); ++i) {
ScanRange* split = file->splits[i];
if (split->offset() + split->len() == file->file_length) return split;
}
return nullptr;
}
bool HdfsScanner::EvalRuntimeFilters(TupleRow* row) {
int num_filters = filter_ctxs_.size();
for (int i = 0; i < num_filters; ++i) {
if (!EvalRuntimeFilter(i, row)) return false;
}
return true;
}