blob: 3bf9ef5105cec658c0c15ef1e87681e99276871c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/fragment-state.h"
#include <gutil/strings/substitute.h>
#include "codegen/llvm-codegen.h"
#include "exec/exec-node.h"
#include "exec/data-sink.h"
#include "runtime/exec-env.h"
#include "runtime/query-state.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
#include "util/runtime-profile.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
namespace impala {
const string FragmentState::FSTATE_THREAD_GROUP_NAME = "fragment-init";
const string FragmentState::FSTATE_THREAD_NAME_PREFIX = "init-and-codegen";
Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info,
const ExecQueryFInstancesRequestPB& exec_request, QueryState* state,
std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map) {
int fragment_ctx_idx = 0;
const TPlanFragment& frag = fragment_info.fragments[fragment_ctx_idx];
const PlanFragmentCtxPB& frag_ctx = exec_request.fragment_ctxs(fragment_ctx_idx);
FragmentState* fragment_state =
state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx));
fragment_map[fragment_state->fragment_idx()] = fragment_state;
for (int i = 0; i < fragment_info.fragment_instance_ctxs.size(); ++i) {
const TPlanFragmentInstanceCtx& instance_ctx =
fragment_info.fragment_instance_ctxs[i];
const PlanFragmentInstanceCtxPB& instance_ctx_pb =
exec_request.fragment_instance_ctxs(i);
DCHECK_EQ(instance_ctx.fragment_idx, instance_ctx_pb.fragment_idx());
// determine corresponding TPlanFragment
if (fragment_state->fragment_idx() != instance_ctx.fragment_idx) {
++fragment_ctx_idx;
DCHECK_LT(fragment_ctx_idx, fragment_info.fragments.size());
const TPlanFragment& fragment = fragment_info.fragments[fragment_ctx_idx];
const PlanFragmentCtxPB& fragment_ctx =
exec_request.fragment_ctxs(fragment_ctx_idx);
DCHECK_EQ(fragment.idx, fragment_ctx.fragment_idx());
fragment_state =
state->obj_pool()->Add(new FragmentState(state, fragment, fragment_ctx));
fragment_map[fragment_state->fragment_idx()] = fragment_state;
// we expect fragment and instance contexts to follow the same order
DCHECK_EQ(fragment_state->fragment_idx(), instance_ctx.fragment_idx);
}
fragment_state->AddInstance(&instance_ctx, &instance_ctx_pb);
}
// Init all fragments.
for (auto& elem : fragment_map) {
RETURN_IF_ERROR(elem.second->Init());
}
return Status::OK();
}
Status FragmentState::Init() {
RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_.plan, &plan_tree_));
RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
fragment_.output_sink, plan_tree_->row_descriptor_, this, &sink_config_));
return Status::OK();
}
Status FragmentState::InvokeCodegen(RuntimeProfile::EventSequence* event_sequence) {
unique_lock<mutex> l(codegen_lock_);
if (!codegen_invoked_) {
codegen_invoked_ = true;
codegen_status_ = CodegenHelper(event_sequence);
if (!codegen_status_.ok()) {
string error_ctx = Substitute(
"Fragment failed during codegen, fragment index: $0", fragment_.display_name);
codegen_status_.AddDetail(error_ctx);
query_state_->ErrorDuringFragmentCodegen(codegen_status_);
}
}
return codegen_status_;
}
Status FragmentState::CodegenHelper(RuntimeProfile::EventSequence* event_sequence) {
DCHECK(plan_tree_ != nullptr);
DCHECK(sink_config_ != nullptr);
DCHECK(ShouldCodegen());
RETURN_IF_ERROR(CreateCodegen());
SCOPED_TIMER(codegen()->main_thread_timer());
{
SCOPED_TIMER2(codegen()->ir_generation_timer(),
codegen()->runtime_profile()->total_time_counter());
SCOPED_THREAD_COUNTER_MEASUREMENT(codegen()->llvm_thread_counters());
plan_tree_->Codegen(this);
sink_config_->Codegen(this);
// It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
// ScalarFnCall has no fall back to interpretation when codegen fails so propagates
// the error status for now. Now that IMPALA-4233 is fixed, revisit this comment.
RETURN_IF_ERROR(CodegenScalarExprs());
}
LlvmCodeGen* llvm_codegen = codegen();
DCHECK(llvm_codegen != nullptr);
// In case we need codegen, we cannot use asynchronous codegen because we cannot
// interpret the query until codegen has run.
const bool async_enabled = query_options().async_codegen;
if (async_enabled && is_interpretable()) {
RETURN_IF_ERROR(llvm_codegen->FinalizeModuleAsync(event_sequence));
} else {
RETURN_IF_ERROR(llvm_codegen->FinalizeModule());
}
return Status::OK();
}
FragmentState::FragmentState(QueryState* query_state, const TPlanFragment& fragment,
const PlanFragmentCtxPB& fragment_ctx)
: query_state_(query_state), fragment_(fragment), fragment_ctx_(fragment_ctx) {
runtime_profile_ = RuntimeProfile::Create(
query_state->obj_pool(), Substitute("Fragment $0", fragment_.display_name));
query_state_->host_profile()->AddChild(runtime_profile_);
}
FragmentState::~FragmentState() {}
void FragmentState::ReleaseResources() {
if (codegen_ != nullptr) codegen_->Close();
if (plan_tree_ != nullptr) plan_tree_->Close();
if (sink_config_ != nullptr) sink_config_->Close();
}
Status FragmentState::CreateCodegen() {
if (codegen_.get() != NULL) return Status::OK();
RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(
this, query_mem_tracker(), PrintId(query_id()), &codegen_));
codegen_->EnableOptimizations(true);
runtime_profile_->AddChild(codegen_->runtime_profile());
return Status::OK();
}
Status FragmentState::CodegenScalarExprs() {
for (auto& item : scalar_exprs_to_codegen_) {
llvm::Function* fn;
RETURN_IF_ERROR(item.first->GetCodegendComputeFn(codegen_.get(), item.second, &fn));
}
return Status::OK();
}
std::string FragmentState::GenerateCodegenMsg(
bool codegen_enabled, const Status& codegen_status, const std::string& extra_label) {
const string& err_msg = codegen_status.ok() ? "" : codegen_status.msg().msg();
return GenerateCodegenMsg(codegen_enabled, err_msg, extra_label);
}
std::string FragmentState::GenerateCodegenMsg(bool codegen_enabled,
const std::string& extra_info, const std::string& extra_label) {
std::stringstream str;
if (!extra_label.empty()) str << extra_label << " ";
str << (codegen_enabled ? "Codegen Enabled" : "Codegen Disabled");
if (!extra_info.empty()) str << ": " + extra_info;
return str.str();
}
}