// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/exec-node.h"

#include <memory>
#include <sstream>
#include <unistd.h>  // for sleep()

#include <thrift/protocol/TDebugProtocol.h>

#include "codegen/codegen-anyval.h"
#include "codegen/llvm-codegen.h"
#include "common/object-pool.h"
#include "common/status.h"
#include "exec/aggregation-node.h"
#include "exec/analytic-eval-node.h"
#include "exec/cardinality-check-node.h"
#include "exec/data-source-scan-node.h"
#include "exec/empty-set-node.h"
#include "exec/exchange-node.h"
#include "exec/exec-node-util.h"
#include "exec/hbase-scan-node.h"
#include "exec/hdfs-scan-node-mt.h"
#include "exec/hdfs-scan-node.h"
#include "exec/kudu-scan-node-mt.h"
#include "exec/kudu-scan-node.h"
#include "exec/kudu-util.h"
#include "exec/nested-loop-join-node.h"
#include "exec/partial-sort-node.h"
#include "exec/partitioned-hash-join-node.h"
#include "exec/select-node.h"
#include "exec/singular-row-src-node.h"
#include "exec/sort-node.h"
#include "exec/streaming-aggregation-node.h"
#include "exec/subplan-node.h"
#include "exec/topn-node.h"
#include "exec/union-node.h"
#include "exec/unnest-node.h"
#include "exprs/expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
#include "runtime/initial-reservations.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"

#include "common/names.h"

using strings::Substitute;

namespace impala {
Status PlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
  tnode_ = &tnode;
  row_descriptor_ = state->obj_pool()->Add(
      new RowDescriptor(state->desc_tbl(), tnode_->row_tuples, tnode_->nullable_tuples));

  if (tnode_->node_type != TPlanNodeType::AGGREGATION_NODE) {
    // In Agg node the conjuncts are executed by the Aggregators.
    RETURN_IF_ERROR(
        ScalarExpr::Create(tnode_->conjuncts, *row_descriptor_, state, &conjuncts_));
  }
  return Status::OK();
}

Status PlanNode::CreateTree(
      RuntimeState* state, const TPlan& plan, PlanNode** root) {
  if (plan.nodes.size() == 0) {
    *root = NULL;
    return Status::OK();
  }
  int node_idx = 0;
  Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root);
  if (status.ok() && node_idx + 1 != plan.nodes.size()) {
    status = Status(
        "Plan tree only partially reconstructed. Not all thrift nodes were used.");
  }
  if (!status.ok()) {
    LOG(ERROR) << "Could not construct plan tree:\n"
               << apache::thrift::ThriftDebugString(plan);
  }
  return status;
}

Status PlanNode::CreateTreeHelper(RuntimeState* state,
      const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
      PlanNode** root) {
  // propagate error case
  if (*node_idx >= tnodes.size()) {
    return Status("Failed to reconstruct plan tree from thrift.");
  }
  const TPlanNode& tnode = tnodes[*node_idx];

  int num_children = tnode.num_children;
  PlanNode* node = NULL;
  RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node, state));
  if (parent != NULL) {
    parent->children_.push_back(node);
  } else {
    *root = node;
  }
  for (int i = 0; i < num_children; ++i) {
    ++*node_idx;
    RETURN_IF_ERROR(CreateTreeHelper(state, tnodes, node, node_idx, NULL));
    // we are expecting a child, but have used all nodes
    // this means we have been given a bad tree and must fail
    if (*node_idx >= tnodes.size()) {
      return Status("Failed to reconstruct plan tree from thrift.");
    }
  }

  // Call Init() after children have been set and Init()'d themselves
  RETURN_IF_ERROR(node->Init(tnode, state));
  return Status::OK();
}

Status PlanNode::CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node,
      RuntimeState* state) {
  switch (tnode.node_type) {
    case TPlanNodeType::HDFS_SCAN_NODE:
      *node = pool->Add(new HdfsScanPlanNode());
      break;
    case TPlanNodeType::HBASE_SCAN_NODE:
    case TPlanNodeType::DATA_SOURCE_NODE:
    case TPlanNodeType::KUDU_SCAN_NODE:
      *node = pool->Add(new ScanPlanNode());
      break;
    case TPlanNodeType::AGGREGATION_NODE:
      *node = pool->Add(new AggregationPlanNode());
      break;
    case TPlanNodeType::HASH_JOIN_NODE:
      *node = pool->Add(new PartitionedHashJoinPlanNode());
      break;
    case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
      *node = pool->Add(new NestedLoopJoinPlanNode());
      break;
    case TPlanNodeType::EMPTY_SET_NODE:
      *node = pool->Add(new EmptySetPlanNode());
      break;
    case TPlanNodeType::EXCHANGE_NODE:
      *node = pool->Add(new ExchangePlanNode());
      break;
    case TPlanNodeType::SELECT_NODE:
      *node = pool->Add(new SelectPlanNode());
      break;
    case TPlanNodeType::SORT_NODE:
      if (tnode.sort_node.type == TSortType::PARTIAL) {
        *node = pool->Add(new PartialSortPlanNode());
      } else if (tnode.sort_node.type == TSortType::TOPN) {
        *node = pool->Add(new TopNPlanNode());
      } else {
        DCHECK(tnode.sort_node.type == TSortType::TOTAL);
        *node = pool->Add(new SortPlanNode());
      }
      break;
    case TPlanNodeType::UNION_NODE:
      *node = pool->Add(new UnionPlanNode());
      break;
    case TPlanNodeType::ANALYTIC_EVAL_NODE:
      *node = pool->Add(new AnalyticEvalPlanNode());
      break;
    case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
      *node = pool->Add(new SingularRowSrcPlanNode());
      break;
    case TPlanNodeType::SUBPLAN_NODE:
      *node = pool->Add(new SubplanPlanNode());
      break;
    case TPlanNodeType::UNNEST_NODE:
      *node = pool->Add(new UnnestPlanNode());
      break;
    case TPlanNodeType::CARDINALITY_CHECK_NODE:
      *node = pool->Add(new CardinalityCheckPlanNode());
      break;
    default:
      map<int, const char*>::const_iterator i =
          _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
      const char* str = "unknown node type";
      if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
        str = i->second;
      }
      stringstream error_msg;
      error_msg << str << " not implemented";
      return Status(error_msg.str());
  }
  return Status::OK();
}

const string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";

ExecNode::ExecNode(ObjectPool* pool, const PlanNode& pnode, const DescriptorTbl& descs)
  : plan_node_(pnode),
    id_(pnode.tnode_->node_id),
    type_(pnode.tnode_->node_type),
    pool_(pool),
    conjuncts_(pnode.conjuncts_),
    row_descriptor_(*(pnode.row_descriptor_)),
    resource_profile_(pnode.tnode_->resource_profile),
    limit_(pnode.tnode_->limit),
    runtime_profile_(RuntimeProfile::Create(
        pool_, Substitute("$0 (id=$1)", PrintThriftEnum(type_), id_))),
    rows_returned_counter_(nullptr),
    rows_returned_rate_(nullptr),
    containing_subplan_(nullptr),
    disable_codegen_(pnode.tnode_->disable_codegen),
    num_rows_returned_(0),
    is_closed_(false) {
  runtime_profile_->SetPlanNodeId(id_);
  debug_options_.phase = TExecNodePhase::INVALID;
}

ExecNode::~ExecNode() {
}

Status ExecNode::Prepare(RuntimeState* state) {
  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state));
  DCHECK(runtime_profile_ != NULL);
  mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
      state->instance_mem_tracker()));
  expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
  expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
  expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
  rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
  rows_returned_rate_ = runtime_profile()->AddDerivedCounter(
      ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
      bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_returned_counter_,
          runtime_profile()->total_time_counter()));
  RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, expr_perm_pool(),
      expr_results_pool(), &conjunct_evals_));
  DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
  for (int i = 0; i < children_.size(); ++i) {
    RETURN_IF_ERROR(children_[i]->Prepare(state));
  }
  reservation_manager_.Init(
      Substitute("$0 id=$1 ptr=$2", PrintThriftEnum(type_), id_, this), runtime_profile_,
      state->instance_buffer_reservation(), mem_tracker_.get(), resource_profile_,
      debug_options_);
  if (!IsInSubplan()) {
    events_ = runtime_profile_->AddEventSequence("Node Lifecycle Event Timeline");
    events_->Start(state->query_state()->fragment_events_start_time());
  }
  return Status::OK();
}

void ExecNode::Codegen(RuntimeState* state) {
  DCHECK(state->ShouldCodegen());
  DCHECK(state->codegen() != NULL);
  for (int i = 0; i < children_.size(); ++i) {
    children_[i]->Codegen(state);
  }
}

Status ExecNode::Open(RuntimeState* state) {
  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state));
  DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
  return ScalarExprEvaluator::Open(conjunct_evals_, state);
}

Status ExecNode::Reset(RuntimeState* state, RowBatch* row_batch) {
  num_rows_returned_ = 0;
  for (int i = 0; i < children_.size(); ++i) {
    RETURN_IF_ERROR(children_[i]->Reset(state, row_batch));
  }
  return Status::OK();
}

void ExecNode::Close(RuntimeState* state) {
  if (is_closed_) return;
  is_closed_ = true;

  if (rows_returned_counter_ != NULL) {
    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
  }
  for (int i = 0; i < children_.size(); ++i) {
    children_[i]->Close(state);
  }

  ScalarExprEvaluator::Close(conjunct_evals_, state);
  ScalarExpr::Close(conjuncts_);
  if (expr_perm_pool() != nullptr) expr_perm_pool_->FreeAll();
  if (expr_results_pool() != nullptr) expr_results_pool_->FreeAll();
  reservation_manager_.Close(state);
  if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close();
  if (mem_tracker_ != nullptr) {
    if (mem_tracker()->consumption() != 0) {
      LOG(WARNING) << "Query " << PrintId(state->query_id()) << " may have leaked memory."
          << endl << state->instance_mem_tracker()->LogUsage(MemTracker::UNLIMITED_DEPTH);
      DCHECK_EQ(mem_tracker()->consumption(), 0)
          << "Leaked memory." << endl
          << state->instance_mem_tracker()->LogUsage(MemTracker::UNLIMITED_DEPTH);
    }
    mem_tracker_->Close();
  }
  if (events_ != nullptr) events_->MarkEvent("Closed");
}

Status ExecNode::CreateTree(RuntimeState* state, const PlanNode& plan_node,
    const DescriptorTbl& descs, ExecNode** root) {
  RETURN_IF_ERROR(plan_node.CreateExecNode(state, root));
  DCHECK(*root != nullptr);
  for (auto& child : plan_node.children_) {
    ExecNode* child_node;
    RETURN_IF_ERROR(CreateTree(state, *child, descs, &child_node));
    DCHECK(child_node != nullptr);
    (*root)->children_.push_back(child_node);
  }

  // build up tree of profiles; add children >0 first, so that when we print
  // the profile, child 0 is printed last (makes the output more readable)
  for (int i = 1; i < (*root)->children_.size(); ++i) {
    (*root)->runtime_profile()->AddChild((*root)->children_[i]->runtime_profile());
  }
  if (!(*root)->children_.empty()) {
    (*root)->runtime_profile()->AddChild((*root)->children_[0]->runtime_profile(), false);
  }
  return Status::OK();
}

void ExecNode::SetDebugOptions(const TDebugOptions& debug_options, ExecNode* root) {
  DCHECK(debug_options.__isset.node_id);
  DCHECK(debug_options.__isset.phase);
  DCHECK(debug_options.__isset.action);
  if (debug_options.node_id == -1 || root->id_ == debug_options.node_id) {
    root->debug_options_ = debug_options;
  }
  for (int i = 0; i < root->children_.size(); ++i) {
    SetDebugOptions(debug_options, root->children_[i]);
  }
}

string ExecNode::DebugString() const {
  stringstream out;
  this->DebugString(0, &out);
  return out.str();
}

void ExecNode::DebugString(int indentation_level, stringstream* out) const {
  *out << " conjuncts=" << ScalarExpr::DebugString(conjuncts_);
  for (int i = 0; i < children_.size(); ++i) {
    *out << "\n";
    children_[i]->DebugString(indentation_level + 1, out);
  }
}

string ExecNode::label() const {
  map<int, const char*>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(type_);
  string node_type_name = "UNKNOWN";
  if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
    node_type_name = i->second;
  }
  return Substitute("$0 (id=$1)", node_type_name, std::to_string(id_));
}

void ExecNode::CollectNodes(TPlanNodeType::type node_type, vector<ExecNode*>* nodes) {
  if (type_ == node_type) nodes->push_back(this);
  for (int i = 0; i < children_.size(); ++i) {
    children_[i]->CollectNodes(node_type, nodes);
  }
}

void ExecNode::CollectScanNodes(vector<ExecNode*>* nodes) {
  CollectNodes(TPlanNodeType::HDFS_SCAN_NODE, nodes);
  CollectNodes(TPlanNodeType::HBASE_SCAN_NODE, nodes);
  CollectNodes(TPlanNodeType::KUDU_SCAN_NODE, nodes);
}

Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* state) {
  DCHECK_EQ(debug_options_.phase, phase);
  if (debug_options_.action == TDebugAction::FAIL) {
    return Status(TErrorCode::INTERNAL_ERROR, "Debug Action: FAIL");
  } else if (debug_options_.action == TDebugAction::WAIT) {
    // See DebugOptions::DebugOptions().
    DCHECK(phase != TExecNodePhase::PREPARE && phase != TExecNodePhase::CLOSE);
    while (!state->is_cancelled()) {
      sleep(1);
    }
    return Status::CANCELLED;
  } else if (debug_options_.action == TDebugAction::INJECT_ERROR_LOG) {
    state->LogError(
        ErrorMsg(TErrorCode::INTERNAL_ERROR, "Debug Action: INJECT_ERROR_LOG"));
    return Status::OK();
  } else if (debug_options_.action == TDebugAction::MEM_LIMIT_EXCEEDED) {
    return MemTracker::MemLimitExceeded(
        mem_tracker(), state, "Debug Action: MEM_LIMIT_EXCEEDED");
  } else if (debug_options_.action == TDebugAction::SET_DENY_RESERVATION_PROBABILITY) {
    // We can only enable the debug action if the buffer pool client is registered.
    // If the buffer client is not registered at this point (e.g. if phase is PREPARE or
    // OPEN), then we will enable the debug action at the time when the client is
    // registered.
    if (reservation_manager_.buffer_pool_client()->is_registered()) {
      RETURN_IF_ERROR(reservation_manager_.EnableDenyReservationDebugAction());
    }
  } else {
    DCHECK_EQ(debug_options_.action, TDebugAction::DELAY);
    VLOG(1) << "DEBUG_ACTION: Sleeping";
    SleepForMs(100);
  }
  return Status::OK();
}

bool ExecNode::CheckLimitAndTruncateRowBatchIfNeeded(RowBatch* row_batch, bool* eos) {
  DCHECK(limit_ != 0);
  const int row_batch_size = row_batch->num_rows();
  const bool reached_limit =
      !(limit_ == -1 || (rows_returned() + row_batch_size) < limit_);
  const int num_rows_to_consume =
      !reached_limit ? row_batch_size : limit_ - rows_returned();
  IncrementNumRowsReturned(num_rows_to_consume);
  if (reached_limit) {
    row_batch->set_num_rows(num_rows_to_consume);
    *eos = true;
  }
  return reached_limit;
}

bool ExecNode::CheckLimitAndTruncateRowBatchIfNeededShared(
    RowBatch* row_batch, bool* eos) {
  DCHECK(limit_ != 0);
  const int row_batch_size = row_batch->num_rows();
  const bool reached_limit =
      !(limit_ == -1 || (rows_returned_shared() + row_batch_size) < limit_);
  const int num_rows_to_consume =
      !reached_limit ? row_batch_size : limit_ - rows_returned_shared();
  IncrementNumRowsReturnedShared(num_rows_to_consume);
  if (reached_limit) {
    row_batch->set_num_rows(num_rows_to_consume);
    *eos = true;
  }
  return reached_limit;
}

bool ExecNode::EvalConjuncts(
    ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row) {
  for (int i = 0; i < num_conjuncts; ++i) {
    if (!EvalPredicate(evals[i], row)) return false;
  }
  return true;
}

Status ExecNode::QueryMaintenance(RuntimeState* state) {
  expr_results_pool_->Clear();
  return state->CheckQueryState();
}

bool ExecNode::IsNodeCodegenDisabled() const {
  return disable_codegen_;
}

// Codegen for EvalConjuncts.  The generated signature is the same as EvalConjuncts().
//
// For a node with two conjunct predicates:
//
// define i1 @EvalConjuncts(%"class.impala::ScalarExprEvaluator"** %evals, i32 %num_evals,
//                          %"class.impala::TupleRow"* %row) #34 {
// entry:
//   %eval_ptr = getelementptr inbounds %"class.impala::ScalarExprEvaluator"*,
//       %"class.impala::ScalarExprEvaluator"** %evals, i32 0
//   %eval = load %"class.impala::ScalarExprEvaluator"*,
//       %"class.impala::ScalarExprEvaluator"** %eval_ptr
//   %result = call i16 @"impala::Operators::Eq_BigIntVal_BigIntValWrapper"(
//       %"class.impala::ScalarExprEvaluator"* %eval, %"class.impala::TupleRow"* %row)
//   %is_null = trunc i16 %result to i1
//   %0 = ashr i16 %result, 8
//   %1 = trunc i16 %0 to i8
//   %val = trunc i8 %1 to i1
//   %is_false = xor i1 %val, true
//   %return_false = or i1 %is_null, %is_false
//   br i1 %return_false, label %false, label %continue
//
// continue:                                         ; preds = %entry
//  %eval_ptr2 = getelementptr inbounds %"class.impala::ScalarExprEvaluator"*,
//      %"class.impala::ScalarExprEvaluator"** %evals, i32 1
//  %eval3 = load %"class.impala::ScalarExprEvaluator"*,
//      %"class.impala::ScalarExprEvaluator"** %eval_ptr2
//  %result4 = call i16 @"impala::Operators::Eq_StringVal_StringValWrapper"(
//      %"class.impala::ScalarExprEvaluator"* %eval3, %"class.impala::TupleRow"* %row)
//  %is_null5 = trunc i16 %result4 to i1
//  %2 = ashr i16 %result4, 8
//  %3 = trunc i16 %2 to i8
//  %val6 = trunc i8 %3 to i1
//  %is_false7 = xor i1 %val6, true
//  %return_false8 = or i1 %is_null5, %is_false
//  br i1 %return_false8, label %false, label %continue1
//
// continue1:                                        ; preds = %continue
//  ret i1 true
//
// false:                                            ; preds = %continue, %entry
//  ret i1 false
// }
//
Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen,
    const vector<ScalarExpr*>& conjuncts, llvm::Function** fn, const char* name) {
  llvm::Function* conjunct_fns[conjuncts.size()];
  for (int i = 0; i < conjuncts.size(); ++i) {
    RETURN_IF_ERROR(conjuncts[i]->GetCodegendComputeFn(codegen, false, &conjunct_fns[i]));
    if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
      // Avoid bloating EvalConjuncts by inlining everything into it.
      codegen->SetNoInline(conjunct_fns[i]);
    }
  }

  // Construct function signature to match
  // bool EvalConjuncts(ScalarExprEvaluator**, int, TupleRow*)
  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
  llvm::Type* eval_type = codegen->GetStructType<ScalarExprEvaluator>();

  LlvmCodeGen::FnPrototype prototype(codegen, name, codegen->bool_type());
  prototype.AddArgument(
      LlvmCodeGen::NamedVariable("evals", codegen->GetPtrPtrType(eval_type)));
  prototype.AddArgument(
      LlvmCodeGen::NamedVariable("num_evals", codegen->i32_type()));
  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));

  LlvmBuilder builder(codegen->context());
  llvm::Value* args[3];
  *fn = prototype.GeneratePrototype(&builder, args);
  llvm::Value* evals_arg = args[0];
  llvm::Value* tuple_row_arg = args[2];

  if (conjuncts.size() > 0) {
    llvm::LLVMContext& context = codegen->context();
    llvm::BasicBlock* false_block = llvm::BasicBlock::Create(context, "false", *fn);

    for (int i = 0; i < conjuncts.size(); ++i) {
      llvm::BasicBlock* true_block =
          llvm::BasicBlock::Create(context, "continue", *fn, false_block);
      llvm::Value* eval_arg_ptr = builder.CreateInBoundsGEP(
          NULL, evals_arg, codegen->GetI32Constant(i), "eval_ptr");
      llvm::Value* eval_arg = builder.CreateLoad(eval_arg_ptr, "eval");

      // Call conjunct_fns[i]
      CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
          conjuncts[i]->type(), conjunct_fns[i], {eval_arg, tuple_row_arg},
          "result");

      // Return false if result.is_null || !result
      llvm::Value* is_null = result.GetIsNull();
      llvm::Value* is_false = builder.CreateNot(result.GetVal(), "is_false");
      llvm::Value* return_false = builder.CreateOr(is_null, is_false, "return_false");
      builder.CreateCondBr(return_false, false_block, true_block);

      // Set insertion point for continue/end
      builder.SetInsertPoint(true_block);
    }
    builder.CreateRet(codegen->true_value());

    builder.SetInsertPoint(false_block);
    builder.CreateRet(codegen->false_value());
  } else {
    builder.CreateRet(codegen->true_value());
  }

  // Avoid inlining EvalConjuncts into caller if it is large.
  if (conjuncts.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
    codegen->SetNoInline(*fn);
  }

  *fn = codegen->FinalizeFunction(*fn);
  if (*fn == NULL) {
    return Status("ExecNode::CodegenEvalConjuncts(): codegen'd EvalConjuncts() function "
                  "failed verification, see log");
  }
  return Status::OK();
}
}
