// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/plan-root-sink.h"
#include "exec/buffered-plan-root-sink.h"
#include "exec/blocking-plan-root-sink.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "runtime/row-batch.h"
#include "runtime/tuple-row.h"
#include "service/query-result-set.h"
#include "util/pretty-printer.h"
#include <memory>
#include <boost/thread/mutex.hpp>
using namespace std;
using boost::unique_lock;
using boost::mutex;
namespace impala {
DataSink* PlanRootSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
TDataSinkId sink_id = fragment_ctx.fragment.idx;
ObjectPool* pool = state->obj_pool();
if (state->query_options().spool_query_results) {
return pool->Add(new BufferedPlanRootSink(
sink_id, *this, state, fragment_instance_ctx.debug_options));
} else {
return pool->Add(new BlockingPlanRootSink(sink_id, *this, state));
TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
: DataSink(sink_id, sink_config, "PLAN_ROOT_SINK", state),
num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {}
PlanRootSink::~PlanRootSink() {}
Status PlanRootSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
rows_sent_counter_ = ADD_COUNTER(profile_, "RowsSent", TUnit::UNIT);
rows_sent_rate_ = profile_->AddDerivedCounter("RowsSentRate", TUnit::UNIT_PER_SECOND,
bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_sent_counter_,
return Status::OK();
void PlanRootSink::ValidateCollectionSlots(
const RowDescriptor& row_desc, RowBatch* batch) {
#ifndef NDEBUG
if (!row_desc.HasVarlenSlots()) return;
for (int i = 0; i < batch->num_rows(); ++i) {
TupleRow* row = batch->GetRow(i);
for (int j = 0; j < row_desc.tuple_descriptors().size(); ++j) {
const TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[j];
if (tuple_desc->collection_slots().empty()) continue;
for (int k = 0; k < tuple_desc->collection_slots().size(); ++k) {
const SlotDescriptor* slot_desc = tuple_desc->collection_slots()[k];
int tuple_idx = row_desc.GetTupleIdx(slot_desc->parent()->id());
const Tuple* tuple = row->GetTuple(tuple_idx);
if (tuple == NULL) continue;
Status PlanRootSink::UpdateAndCheckRowsProducedLimit(
RuntimeState* state, RowBatch* batch) {
// Since the PlanRootSink has a single producer, the
// num_rows_returned_ value can be verified without acquiring any locks.
num_rows_produced_ += batch->num_rows();
if (num_rows_produced_limit_ > 0 && num_rows_produced_ > num_rows_produced_limit_) {
Status err = Status::Expected(TErrorCode::ROWS_PRODUCED_LIMIT_EXCEEDED,
PrettyPrinter::Print(num_rows_produced_limit_, TUnit::NONE));
VLOG_QUERY << err.msg().msg();
return err;
return Status::OK();