blob: ecc03aa52c0f198d5eb0162d0ebd2a0ef7a61721 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_PLAN_ROOT_SINK_H
#define IMPALA_EXEC_PLAN_ROOT_SINK_H
#include "exec/data-sink.h"
namespace impala {
class TupleRow;
class RowBatch;
class QueryResultSet;
class ScalarExprEvaluator;
class PlanRootSinkConfig : public DataSinkConfig {
public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
~PlanRootSinkConfig() override {}
};
/// Sink which manages the handoff between a 'sender' (a fragment instance) that produces
/// batches by calling Send(), and a 'consumer' (e.g. the coordinator) which consumes rows
/// formed by computing a set of output expressions over the input batches, by calling
/// GetNext(). Send() and GetNext() are called concurrently.
///
/// The sender uses Send() to fill in as many rows as are requested from the current
/// batch. When the batch is exhausted - which may take several calls to GetNext() -
/// Send() returns so that the fragment instance can produce another row batch.
///
/// The consumer uses GetNext() to fetch a specified number of rows into a given
/// QueryResultSet. Calls to GetNext() block until rows are available from the sender
/// thread.
///
/// FlushFinal() should be called by the sender to signal it has finished calling
/// Send() for all rows. Close() should be called by the sender to release resources.
///
/// When the fragment instance is cancelled, Cancel() is called to unblock both the
/// sender and consumer. Cancel() may be called concurrently with Send(), GetNext() and
/// Close().
///
/// The sink is thread safe up to a single sender and single consumer.
///
/// Lifetime: The sink is owned by the QueryState and has the same lifetime as
/// QueryState. The QueryState references from the fragment instance and the Coordinator
/// ensures that this outlives any calls to Send() and GetNext(), respectively.
class PlanRootSink : public DataSink {
public:
PlanRootSink(
TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
virtual ~PlanRootSink();
/// Called before Send(), Open(), or Close(). Performs any additional setup necessary,
/// such as initializing runtime counters.
virtual Status Prepare(
RuntimeState* state, MemTracker* parent_mem_tracker) override = 0;
/// Sends a new batch. Ownership of 'batch' remains with the sender.
virtual Status Send(RuntimeState* state, RowBatch* batch) override = 0;
/// Indicates eos to the producer. When this method is called, all rows have
/// successfully been sent by the producer.
virtual Status FlushFinal(RuntimeState* state) override = 0;
/// To be called by sender only. Release resources and shutdowns the sink.
virtual void Close(RuntimeState* state) override = 0;
/// To be called by the consumer only. 'result_set' with up to 'num_rows' rows
/// produced by the fragment instance that calls Send(). *eos is set to 'true' when
/// there are no more rows to consume. If Cancel() or Close() are called concurrently,
/// GetNext() will return and may not populate 'result_set'. All subsequent calls
/// after Cancel() or Close() set eos and then return the current query status.
/// 'timeout' is the amount of time (in microseonds) this method should wait for enough
/// rows to become available before returning (e.g. how long the consumer thread waits
/// for the producer thread to produce RowBatches). If the timeout is hit, GetNext() can
/// return before adding 'num_rows' rows to 'result_set'. It is possible 0 rows are
/// added to the 'result_set' if the producer thread does not produce rows within the
/// timeout. A timeout of 0 causes this method to wait indefinitely.
virtual Status GetNext(RuntimeState* state, QueryResultSet* result_set, int num_rows,
bool* eos, int64_t timeout_us) = 0;
/// Notifies both the consumer and sender that the query has been cancelled so they can
/// check the cancellation flag in the RuntimeState. The cancellation flag should be set
/// prior to calling this. Called by a separate cancellation thread.
virtual void Cancel(RuntimeState* state) = 0;
protected:
/// Validates that all collection-typed slots in the given batch are set to NULL
/// See SubplanNode for details on when collection-typed slots are set to NULL.
/// TODO: This validation will become obsolete when we can return collection values.
/// We will then need a different mechanism to assert the correct behavior of the
/// SubplanNode with respect to setting collection-slots to NULL.
void ValidateCollectionSlots(const RowDescriptor& row_desc, RowBatch* batch);
/// Check to ensure that the number of rows produced by query execution does not exceed
/// the NUM_ROWS_PRODUCED_LIMIT query option. Returns an error Status if the given
/// batch causes the limit to be exceeded. Updates the value of num_rows_produced_.
Status UpdateAndCheckRowsProducedLimit(RuntimeState* state, RowBatch* batch);
/// State of the sender:
/// - ROWS_PENDING: the sender is still producing rows; the only non-terminal state
/// - EOS: the sender has passed all rows to Send()
/// - CLOSED_NOT_EOS: the sender (i.e. sink) was closed before all rows were passed to
/// Send()
enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
SenderState sender_state_ = SenderState::ROWS_PENDING;
/// The number of rows sent to this PlanRootSink via Send(). Initialized in
/// Prepare().
RuntimeProfile::Counter* rows_sent_counter_ = nullptr;
/// The rate at which rows are sent to this PlanRootSink via Send(). Initialized in
/// Prepare().
RuntimeProfile::Counter* rows_sent_rate_ = nullptr;
private:
/// Limit on the number of rows produced by this query, initialized by the constructor.
const int64_t num_rows_produced_limit_;
/// Updated by CheckRowsProducedLimit() to indicate the total number of rows produced
/// by query execution.
int64_t num_rows_produced_ = 0;
};
}
#endif