| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/hdfs-scanner.h" |
| |
| #include <sstream> |
| #include <boost/algorithm/string.hpp> |
| |
| #include "codegen/codegen-anyval.h" |
| #include "codegen/llvm-codegen.h" |
| #include "common/logging.h" |
| #include "common/object-pool.h" |
| #include "exec/text-converter.h" |
| #include "exec/hdfs-scan-node.h" |
| #include "exec/hdfs-scan-node-mt.h" |
| #include "exec/read-write-util.h" |
| #include "exec/text-converter.inline.h" |
| #include "exprs/expr-context.h" |
| #include "runtime/collection-value-builder.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/hdfs-fs-cache.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/mem-pool.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/string-value.h" |
| #include "runtime/tuple-row.h" |
| #include "runtime/tuple.h" |
| #include "util/bitmap.h" |
| #include "util/codec.h" |
| #include "util/debug-util.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/sse-util.h" |
| #include "util/string-parser.h" |
| #include "util/test-info.h" |
| #include "gen-cpp/PlanNodes_types.h" |
| |
| #include "common/names.h" |
| |
| using namespace impala; |
| using namespace llvm; |
| using namespace strings; |
| |
| const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation"; |
| const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner"; |
| |
| HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) |
| : scan_node_(scan_node), |
| state_(state), |
| context_(NULL), |
| stream_(NULL), |
| eos_(false), |
| scanner_conjunct_ctxs_(NULL), |
| template_tuple_pool_(new MemPool(scan_node->mem_tracker())), |
| template_tuple_(NULL), |
| tuple_byte_size_(scan_node->tuple_desc()->byte_size()), |
| tuple_(NULL), |
| batch_(NULL), |
| tuple_mem_(NULL), |
| parse_status_(Status::OK()), |
| decompression_type_(THdfsCompression::NONE), |
| data_buffer_pool_(new MemPool(scan_node->mem_tracker())), |
| decompress_timer_(NULL), |
| write_tuples_fn_(NULL) { |
| } |
| |
| HdfsScanner::HdfsScanner() |
| : scan_node_(NULL), |
| state_(NULL), |
| context_(NULL), |
| stream_(NULL), |
| eos_(false), |
| scanner_conjunct_ctxs_(NULL), |
| template_tuple_pool_(NULL), |
| template_tuple_(NULL), |
| tuple_byte_size_(-1), |
| tuple_(NULL), |
| batch_(NULL), |
| tuple_mem_(NULL), |
| parse_status_(Status::OK()), |
| decompression_type_(THdfsCompression::NONE), |
| data_buffer_pool_(NULL), |
| decompress_timer_(NULL), |
| write_tuples_fn_(NULL) { |
| DCHECK(TestInfo::is_test()); |
| } |
| |
| HdfsScanner::~HdfsScanner() { |
| } |
| |
| Status HdfsScanner::Open(ScannerContext* context) { |
| context_ = context; |
| stream_ = context->GetStream(); |
| |
| // Clone the scan node's conjuncts map. The cloned contexts must be closed by the |
| // caller. |
| for (const auto& entry: scan_node_->conjuncts_map()) { |
| RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second, |
| scan_node_->runtime_state(), &scanner_conjuncts_map_[entry.first])); |
| } |
| DCHECK(scanner_conjuncts_map_.find(scan_node_->tuple_desc()->id()) != |
| scanner_conjuncts_map_.end()); |
| scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()]; |
| |
| // Initialize the template_tuple_. |
| template_tuple_ = scan_node_->InitTemplateTuple( |
| context_->partition_descriptor()->partition_key_value_ctxs(), |
| template_tuple_pool_.get(), state_); |
| template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_; |
| |
| decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime"); |
| return Status::OK(); |
| } |
| |
| void HdfsScanner::Close(RowBatch* row_batch) { |
| if (decompressor_.get() != NULL) decompressor_->Close(); |
| for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_); |
| obj_pool_.Clear(); |
| stream_ = NULL; |
| context_->ClearStreams(); |
| } |
| |
| Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, |
| THdfsFileFormat::type type, const string& scanner_name) { |
| if (!scan_node_->tuple_desc()->string_slots().empty() |
| && partition->escape_char() != '\0') { |
| // Codegen currently doesn't emit call to MemPool::TryAllocate() so skip codegen if |
| // there are strings slots and we need to compact (i.e. copy) the data. |
| scan_node_->IncNumScannersCodegenDisabled(); |
| return Status::OK(); |
| } |
| |
| write_tuples_fn_ = reinterpret_cast<WriteTuplesFn>(scan_node_->GetCodegenFn(type)); |
| if (write_tuples_fn_ == NULL) { |
| scan_node_->IncNumScannersCodegenDisabled(); |
| return Status::OK(); |
| } |
| VLOG(2) << scanner_name << "(node_id=" << scan_node_->id() |
| << ") using llvm codegend functions."; |
| scan_node_->IncNumScannersCodegenEnabled(); |
| return Status::OK(); |
| } |
| |
| Status HdfsScanner::StartNewRowBatch() { |
| DCHECK(scan_node_->HasRowBatchQueue()); |
| batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), |
| scan_node_->mem_tracker()); |
| int64_t tuple_buffer_size; |
| RETURN_IF_ERROR( |
| batch_->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_)); |
| return Status::OK(); |
| } |
| |
| int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) { |
| DCHECK(scan_node_->HasRowBatchQueue()); |
| DCHECK(batch_ != NULL); |
| DCHECK_GT(batch_->capacity(), batch_->num_rows()); |
| *pool = batch_->tuple_data_pool(); |
| *tuple_mem = reinterpret_cast<Tuple*>(tuple_mem_); |
| *tuple_row_mem = batch_->GetRow(batch_->AddRow()); |
| return batch_->capacity() - batch_->num_rows(); |
| } |
| |
| Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool, |
| Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) { |
| int num_tuples; |
| *pool = builder->pool(); |
| RETURN_IF_ERROR(builder->GetFreeMemory(tuple_mem, &num_tuples)); |
| // Treat tuple as a single-tuple row |
| *tuple_row_mem = reinterpret_cast<TupleRow*>(tuple_mem); |
| *num_rows = num_tuples; |
| return Status::OK(); |
| } |
| |
| // TODO(skye): have this check scan_node_->ReachedLimit() and get rid of manual check? |
| Status HdfsScanner::CommitRows(int num_rows) { |
| DCHECK(scan_node_->HasRowBatchQueue()); |
| DCHECK(batch_ != NULL); |
| DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows()); |
| batch_->CommitRows(num_rows); |
| tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows; |
| |
| // We need to pass the row batch to the scan node if there is too much memory attached, |
| // which can happen if the query is very selective. We need to release memory even |
| // if no rows passed predicates. |
| if (batch_->AtCapacity() || context_->num_completed_io_buffers() > 0) { |
| context_->ReleaseCompletedResources(batch_, /* done */ false); |
| static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(batch_); |
| RETURN_IF_ERROR(StartNewRowBatch()); |
| } |
| if (context_->cancelled()) return Status::CANCELLED; |
| // Check for UDF errors. |
| RETURN_IF_ERROR(state_->GetQueryStatus()); |
| // Free local expr allocations for this thread |
| for (const auto& entry: scanner_conjuncts_map_) { |
| ExprContext::FreeLocalAllocations(entry.second); |
| } |
| return Status::OK(); |
| } |
| |
| int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) { |
| DCHECK_GE(num_tuples, 0); |
| DCHECK_EQ(scan_node_->tuple_idx(), 0); |
| DCHECK_EQ(scanner_conjunct_ctxs_->size(), 0); |
| if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples; |
| |
| Tuple** row_tuple = reinterpret_cast<Tuple**>(row); |
| for (int i = 0; i < num_tuples; ++i) row_tuple[i] = template_tuple_; |
| return num_tuples; |
| } |
| |
| bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields, |
| Tuple* tuple, TupleRow* tuple_row, Tuple* template_tuple, |
| uint8_t* error_fields, uint8_t* error_in_row) { |
| *error_in_row = false; |
| // Initialize tuple before materializing slots |
| InitTuple(template_tuple, tuple); |
| |
| for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) { |
| int need_escape = false; |
| int len = fields[i].len; |
| if (UNLIKELY(len < 0)) { |
| len = -len; |
| need_escape = true; |
| } |
| |
| SlotDescriptor* desc = scan_node_->materialized_slots()[i]; |
| bool error = !text_converter_->WriteSlot(desc, tuple, |
| fields[i].start, len, false, need_escape, pool); |
| error_fields[i] = error; |
| *error_in_row |= error; |
| } |
| |
| tuple_row->SetTuple(scan_node_->tuple_idx(), tuple); |
| return EvalConjuncts(tuple_row); |
| } |
| |
| // Codegen for WriteTuple(above) for writing out single nullable string slot and |
| // evaluating a <slot> = <constantexpr> conjunct. The signature matches WriteTuple() |
| // except for the first this* argument. |
| // define i1 @WriteCompleteTuple(%"class.impala::HdfsScanner"* %this, |
| // %"class.impala::MemPool"* %pool, |
| // %"struct.impala::FieldLocation"* %fields, |
| // %"class.impala::Tuple"* %tuple, |
| // %"class.impala::TupleRow"* %tuple_row, |
| // %"class.impala::Tuple"* %template, |
| // i8* %error_fields, i8* %error_in_row) { |
| // entry: |
| // %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple |
| // to <{ %"struct.impala::StringValue", i8 }>* |
| // %tuple_ptr1 = bitcast %"class.impala::Tuple"* %template |
| // to <{ %"struct.impala::StringValue", i8 }>* |
| // %int8_ptr = bitcast <{ %"struct.impala::StringValue", i8 }>* %tuple_ptr to i8* |
| // %null_bytes_ptr = getelementptr i8, i8* %int8_ptr, i32 16 |
| // call void @llvm.memset.p0i8.i64(i8* %null_bytes_ptr, i8 0, i64 1, i32 0, i1 false) |
| // %0 = bitcast %"class.impala::TupleRow"* %tuple_row |
| // to <{ %"struct.impala::StringValue", i8 }>** |
| // %1 = getelementptr <{ %"struct.impala::StringValue", i8 }>*, |
| // <{ %"struct.impala::StringValue", i8 }>** %0, i32 0 |
| // store <{ %"struct.impala::StringValue", i8 }>* %tuple_ptr, |
| // <{ %"struct.impala::StringValue", i8 }>** %1 |
| // br label %parse |
| // |
| // parse: ; preds = %entry |
| // %data_ptr = getelementptr %"struct.impala::FieldLocation", |
| // %"struct.impala::FieldLocation"* %fields, i32 0, i32 0 |
| // %len_ptr = getelementptr %"struct.impala::FieldLocation", |
| // %"struct.impala::FieldLocation"* %fields, i32 0, i32 1 |
| // %slot_error_ptr = getelementptr i8, i8* %error_fields, i32 0 |
| // %data = load i8*, i8** %data_ptr |
| // %len = load i32, i32* %len_ptr |
| // %2 = call i1 @WriteSlot(<{ %"struct.impala::StringValue", i8 }>* %tuple_ptr, |
| // i8* %data, i32 %len) |
| // %slot_parse_error = xor i1 %2, true |
| // %error_in_row2 = or i1 false, %slot_parse_error |
| // %3 = zext i1 %slot_parse_error to i8 |
| // store i8 %3, i8* %slot_error_ptr |
| // %4 = call %"class.impala::ExprContext"* @GetConjunctCtx( |
| // %"class.impala::HdfsScanner"* %this, i32 0) |
| // %conjunct_eval = call i16 @"impala::Operators::Eq_StringVal_StringValWrapper"( |
| // %"class.impala::ExprContext"* %4, %"class.impala::TupleRow"* %tuple_row) |
| // %5 = ashr i16 %conjunct_eval, 8 |
| // %6 = trunc i16 %5 to i8 |
| // %val = trunc i8 %6 to i1 |
| // br i1 %val, label %parse3, label %eval_fail |
| // |
| // parse3: ; preds = %parse |
| // %7 = zext i1 %error_in_row2 to i8 |
| // store i8 %7, i8* %error_in_row |
| // ret i1 true |
| // |
| // eval_fail: ; preds = %parse |
| // ret i1 false |
| // } |
| Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, |
| LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs, |
| Function** write_complete_tuple_fn) { |
| *write_complete_tuple_fn = NULL; |
| SCOPED_TIMER(codegen->codegen_timer()); |
| RuntimeState* state = node->runtime_state(); |
| |
| // TODO: Timestamp is not yet supported |
| for (int i = 0; i < node->materialized_slots().size(); ++i) { |
| SlotDescriptor* slot_desc = node->materialized_slots()[i]; |
| if (slot_desc->type().type == TYPE_TIMESTAMP) { |
| return Status("Timestamp not yet supported for codegen."); |
| } |
| if (slot_desc->type().type == TYPE_DECIMAL) { |
| return Status("Decimal not yet supported for codegen."); |
| } |
| } |
| |
| // Cast away const-ness. The codegen only sets the cached typed llvm struct. |
| TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc()); |
| vector<Function*> slot_fns; |
| for (int i = 0; i < node->materialized_slots().size(); ++i) { |
| SlotDescriptor* slot_desc = node->materialized_slots()[i]; |
| Function* fn = TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, |
| node->hdfs_table()->null_column_value().data(), |
| node->hdfs_table()->null_column_value().size(), true, state->strict_mode()); |
| if (fn == NULL) return Status("CodegenWriteSlot failed."); |
| if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn); |
| slot_fns.push_back(fn); |
| } |
| |
| // Compute order to materialize slots. BE assumes that conjuncts should |
| // be evaluated in the order specified (optimization is already done by FE) |
| vector<int> materialize_order; |
| node->ComputeSlotMaterializationOrder(&materialize_order); |
| |
| // Get types to construct matching function signature to WriteCompleteTuple |
| PointerType* uint8_ptr_type = PointerType::get(codegen->GetType(TYPE_TINYINT), 0); |
| |
| StructType* field_loc_type = reinterpret_cast<StructType*>( |
| codegen->GetType(FieldLocation::LLVM_CLASS_NAME)); |
| Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); |
| Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME); |
| Type* mem_pool_type = codegen->GetType(MemPool::LLVM_CLASS_NAME); |
| Type* hdfs_scanner_type = codegen->GetType(HdfsScanner::LLVM_CLASS_NAME); |
| |
| DCHECK(tuple_opaque_type != NULL); |
| DCHECK(tuple_row_type != NULL); |
| DCHECK(field_loc_type != NULL); |
| DCHECK(hdfs_scanner_type != NULL); |
| |
| PointerType* field_loc_ptr_type = PointerType::get(field_loc_type, 0); |
| PointerType* tuple_opaque_ptr_type = PointerType::get(tuple_opaque_type, 0); |
| PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); |
| PointerType* mem_pool_ptr_type = PointerType::get(mem_pool_type, 0); |
| PointerType* hdfs_scanner_ptr_type = PointerType::get(hdfs_scanner_type, 0); |
| |
| // Generate the typed llvm struct for the output tuple |
| StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen); |
| if (tuple_type == NULL) return Status("Could not generate tuple struct."); |
| PointerType* tuple_ptr_type = PointerType::get(tuple_type, 0); |
| |
| // Initialize the function prototype. This needs to match |
| // HdfsScanner::WriteCompleteTuple's signature identically. |
| LlvmCodeGen::FnPrototype prototype( |
| codegen, "WriteCompleteTuple", codegen->GetType(TYPE_BOOLEAN)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("this", hdfs_scanner_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mem_pool_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("fields", field_loc_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("template", tuple_opaque_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("error_fields", uint8_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("error_in_row", uint8_ptr_type)); |
| |
| LLVMContext& context = codegen->context(); |
| LlvmBuilder builder(context); |
| Value* args[8]; |
| Function* fn = prototype.GeneratePrototype(&builder, &args[0]); |
| |
| BasicBlock* parse_block = BasicBlock::Create(context, "parse", fn); |
| BasicBlock* eval_fail_block = BasicBlock::Create(context, "eval_fail", fn); |
| |
| // Extract the input args |
| Value* this_arg = args[0]; |
| Value* fields_arg = args[2]; |
| Value* tuple_arg = builder.CreateBitCast(args[3], tuple_ptr_type, "tuple_ptr"); |
| Value* tuple_row_arg = args[4]; |
| Value* template_arg = builder.CreateBitCast(args[5], tuple_ptr_type, "tuple_ptr"); |
| Value* errors_arg = args[6]; |
| Value* error_in_row_arg = args[7]; |
| |
| // Codegen for function body |
| Value* error_in_row = codegen->false_value(); |
| // Initialize tuple |
| if (node->num_materialized_partition_keys() == 0) { |
| // No partition key slots, just zero the NULL bytes. |
| codegen->CodegenClearNullBits(&builder, tuple_arg, *tuple_desc); |
| } else { |
| // Copy template tuple. |
| // TODO: only copy what's necessary from the template tuple. |
| codegen->CodegenMemcpy(&builder, tuple_arg, template_arg, tuple_desc->byte_size()); |
| } |
| |
| // Put tuple in tuple_row |
| Value* tuple_row_typed = |
| builder.CreateBitCast(tuple_row_arg, PointerType::get(tuple_ptr_type, 0)); |
| Value* tuple_row_idxs[] = {codegen->GetIntConstant(TYPE_INT, node->tuple_idx())}; |
| Value* tuple_in_row_addr = builder.CreateInBoundsGEP(tuple_row_typed, tuple_row_idxs); |
| builder.CreateStore(tuple_arg, tuple_in_row_addr); |
| builder.CreateBr(parse_block); |
| |
| // Loop through all the conjuncts in order and materialize slots as necessary to |
| // evaluate the conjuncts (e.g. conjunct_ctxs[0] will have the slots it references |
| // first). |
| // materialized_order[slot_idx] represents the first conjunct which needs that slot. |
| // Slots are only materialized if its order matches the current conjunct being |
| // processed. This guarantees that each slot is materialized once when it is first |
| // needed and that at the end of the materialize loop, the conjunct has everything |
| // it needs (either from this iteration or previous iterations). |
| builder.SetInsertPoint(parse_block); |
| for (int conjunct_idx = 0; conjunct_idx <= conjunct_ctxs.size(); ++conjunct_idx) { |
| for (int slot_idx = 0; slot_idx < materialize_order.size(); ++slot_idx) { |
| // If they don't match, it means either the slot has already been |
| // materialized for a previous conjunct or will be materialized later for |
| // another conjunct. Either case, the slot does not need to be materialized |
| // yet. |
| if (materialize_order[slot_idx] != conjunct_idx) continue; |
| |
| // Materialize slots[slot_idx] to evaluate conjunct_ctxs[conjunct_idx] |
| // All slots[i] with materialized_order[i] < conjunct_idx have already been |
| // materialized by prior iterations through the outer loop |
| |
| // Extract ptr/len from fields |
| Value* data_idxs[] = { |
| codegen->GetIntConstant(TYPE_INT, slot_idx), |
| codegen->GetIntConstant(TYPE_INT, 0), |
| }; |
| Value* len_idxs[] = { |
| codegen->GetIntConstant(TYPE_INT, slot_idx), |
| codegen->GetIntConstant(TYPE_INT, 1), |
| }; |
| Value* error_idxs[] = { |
| codegen->GetIntConstant(TYPE_INT, slot_idx), |
| }; |
| Value* data_ptr = builder.CreateInBoundsGEP(fields_arg, data_idxs, "data_ptr"); |
| Value* len_ptr = builder.CreateInBoundsGEP(fields_arg, len_idxs, "len_ptr"); |
| Value* error_ptr = |
| builder.CreateInBoundsGEP(errors_arg, error_idxs, "slot_error_ptr"); |
| Value* data = builder.CreateLoad(data_ptr, "data"); |
| Value* len = builder.CreateLoad(len_ptr, "len"); |
| |
| // Convert length to positive if it is negative. Negative lengths are assigned to |
| // slots that contain escape characters. |
| // TODO: CodegenWriteSlot() currently does not handle text that requres unescaping. |
| // However, if it is modified to handle that case, we need to detect it here and |
| // send a 'need_escape' bool to CodegenWriteSlot(), since we are making the length |
| // positive here. |
| Value* len_lt_zero = builder.CreateICmpSLT(len, |
| codegen->GetIntConstant(TYPE_INT, 0), "len_lt_zero"); |
| Value* ones_compliment_len = builder.CreateNot(len, "ones_compliment_len"); |
| Value* positive_len = builder.CreateAdd( |
| ones_compliment_len, codegen->GetIntConstant(TYPE_INT, 1), |
| "positive_len"); |
| len = builder.CreateSelect(len_lt_zero, positive_len, len, |
| "select_positive_len"); |
| |
| // Call slot parse function |
| Function* slot_fn = slot_fns[slot_idx]; |
| Value* slot_parsed = builder.CreateCall(slot_fn, |
| ArrayRef<Value*>({tuple_arg, data, len})); |
| Value* slot_error = builder.CreateNot(slot_parsed, "slot_parse_error"); |
| error_in_row = builder.CreateOr(error_in_row, slot_error, "error_in_row"); |
| slot_error = builder.CreateZExt(slot_error, codegen->GetType(TYPE_TINYINT)); |
| builder.CreateStore(slot_error, error_ptr); |
| } |
| |
| if (conjunct_idx == conjunct_ctxs.size()) { |
| // In this branch, we've just materialized slots not referenced by any conjunct. |
| // This slots are the last to get materialized. If we are in this branch, the |
| // tuple passed all conjuncts and should be added to the row batch. |
| Value* error_ret = builder.CreateZExt(error_in_row, codegen->GetType(TYPE_TINYINT)); |
| builder.CreateStore(error_ret, error_in_row_arg); |
| builder.CreateRet(codegen->true_value()); |
| } else { |
| // All slots for conjunct_ctxs[conjunct_idx] are materialized, evaluate the partial |
| // tuple against that conjunct and start a new parse_block for the next conjunct |
| parse_block = BasicBlock::Create(context, "parse", fn, eval_fail_block); |
| Function* conjunct_fn; |
| Status status = |
| conjunct_ctxs[conjunct_idx]->root()->GetCodegendComputeFn(codegen, &conjunct_fn); |
| if (!status.ok()) { |
| stringstream ss; |
| ss << "Failed to codegen conjunct: " << status.GetDetail(); |
| state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str())); |
| fn->eraseFromParent(); |
| return status; |
| } |
| if (node->materialized_slots().size() + conjunct_idx |
| >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) { |
| codegen->SetNoInline(conjunct_fn); |
| } |
| |
| Function* get_ctx_fn = |
| codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_CTX, false); |
| Value* ctx = builder.CreateCall(get_ctx_fn, |
| ArrayRef<Value*>({this_arg, codegen->GetIntConstant(TYPE_INT, conjunct_idx)})); |
| |
| Value* conjunct_args[] = {ctx, tuple_row_arg}; |
| CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped( |
| codegen, &builder, TYPE_BOOLEAN, conjunct_fn, conjunct_args, "conjunct_eval"); |
| builder.CreateCondBr(result.GetVal(), parse_block, eval_fail_block); |
| builder.SetInsertPoint(parse_block); |
| } |
| } |
| |
| // Block if eval failed. |
| builder.SetInsertPoint(eval_fail_block); |
| builder.CreateRet(codegen->false_value()); |
| |
| if (node->materialized_slots().size() + conjunct_ctxs.size() |
| > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { |
| codegen->SetNoInline(fn); |
| } |
| *write_complete_tuple_fn = codegen->FinalizeFunction(fn); |
| if (*write_complete_tuple_fn == NULL) { |
| return Status("Failed to finalize write_complete_tuple_fn."); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNodeBase* node, |
| LlvmCodeGen* codegen, Function* write_complete_tuple_fn, |
| Function** write_aligned_tuples_fn) { |
| *write_aligned_tuples_fn = NULL; |
| SCOPED_TIMER(codegen->codegen_timer()); |
| DCHECK(write_complete_tuple_fn != NULL); |
| |
| Function* write_tuples_fn = |
| codegen->GetFunction(IRFunction::HDFS_SCANNER_WRITE_ALIGNED_TUPLES, true); |
| DCHECK(write_tuples_fn != NULL); |
| |
| int replaced = codegen->ReplaceCallSites(write_tuples_fn, write_complete_tuple_fn, |
| "WriteCompleteTuple"); |
| DCHECK_EQ(replaced, 1); |
| |
| *write_aligned_tuples_fn = codegen->FinalizeFunction(write_tuples_fn); |
| if (*write_aligned_tuples_fn == NULL) { |
| return Status("Failed to finalize write_aligned_tuples_fn."); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsScanner::UpdateDecompressor(const THdfsCompression::type& compression) { |
| // Check whether the file in the stream has different compression from the last one. |
| if (compression != decompression_type_) { |
| if (decompression_type_ != THdfsCompression::NONE) { |
| // Close the previous decompressor before creating a new one. |
| DCHECK(decompressor_.get() != NULL); |
| decompressor_->Close(); |
| decompressor_.reset(NULL); |
| } |
| // The LZO-compression scanner is implemented in a dynamically linked library and it |
| // is not created at Codec::CreateDecompressor(). |
| if (compression != THdfsCompression::NONE && compression != THdfsCompression::LZO) { |
| RETURN_IF_ERROR(Codec::CreateDecompressor(data_buffer_pool_.get(), |
| scan_node_->tuple_desc()->string_slots().empty(), compression, &decompressor_)); |
| } |
| decompression_type_ = compression; |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsScanner::UpdateDecompressor(const string& codec) { |
| map<const string, const THdfsCompression::type>::const_iterator |
| type = Codec::CODEC_MAP.find(codec); |
| |
| if (type == Codec::CODEC_MAP.end()) { |
| stringstream ss; |
| ss << Codec::UNKNOWN_CODEC_ERROR << codec; |
| return Status(ss.str()); |
| } |
| RETURN_IF_ERROR(UpdateDecompressor(type->second)); |
| return Status::OK(); |
| } |
| |
| bool HdfsScanner::ReportTupleParseError(FieldLocation* fields, uint8_t* errors) { |
| for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) { |
| if (errors[i]) { |
| const SlotDescriptor* desc = scan_node_->materialized_slots()[i]; |
| ReportColumnParseError(desc, fields[i].start, fields[i].len); |
| errors[i] = false; |
| } |
| } |
| LogRowParseError(); |
| |
| if (state_->abort_on_error()) DCHECK(!parse_status_.ok()); |
| return parse_status_.ok(); |
| } |
| |
| void HdfsScanner::LogRowParseError() { |
| const string& s = Substitute("Error parsing row: file: $0, before offset: $1", |
| stream_->filename(), stream_->file_offset()); |
| state_->LogError(ErrorMsg(TErrorCode::GENERAL, s)); |
| } |
| |
| void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc, |
| const char* data, int len) { |
| if (state_->LogHasSpace() || state_->abort_on_error()) { |
| stringstream ss; |
| ss << "Error converting column: " |
| << desc->col_pos() - scan_node_->num_partition_keys() |
| << " to " << desc->type(); |
| |
| // When skipping multiple header lines we only try to skip them in the first scan |
| // range. For subsequent scan ranges, it's impossible to determine how many lines |
| // precede it and whether any header lines should be skipped. If the header does not |
| // fit into the first scan range and spills into subsequent scan ranges, then we will |
| // try to parse header data here and fail. The scanner of the first scan range will |
| // fail the query if it cannot fully skip the header. However, if abort_on_error is |
| // set, then a race happens between the first scanner to detect the condition and any |
| // other scanner that tries to parse an invalid value. Therefore a possible mitigation |
| // is to increase the max_scan_range_length so the header is fully contained in the |
| // first scan range. |
| if (scan_node_->skip_header_line_count() > 1) { |
| ss << "\n" << "Table has skip.header.line.count set to a value > 1. If the data " |
| << "that could not be parsed looks like it's part of the file's header, then " |
| << "try increasing max_scan_range_length to a value larger than the size of the " |
| << "file's header."; |
| } |
| if (state_->LogHasSpace()) { |
| state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()), 2); |
| } |
| |
| if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(ss.str()); |
| } |
| } |