| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H |
| #define IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H |
| |
| #include <boost/scoped_ptr.hpp> |
| #include <boost/thread/mutex.hpp> |
| |
| #include "common/object-pool.h" |
| #include "common/status.h" |
| #include "gen-cpp/Types_types.h" // for TUniqueId |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/runtime-state.h" |
| #include "util/tuple-row-compare.h" |
| |
| namespace kudu { |
| namespace rpc { |
| class RpcContext; |
| } // namespace rpc |
| } // namespace kudu |
| |
| namespace impala { |
| |
| class KrpcDataStreamMgr; |
| class MemTracker; |
| class RowBatch; |
| class RuntimeProfile; |
| class SortedRunMerger; |
| struct TransmitDataCtx; |
| class TransmitDataRequestPB; |
| class TransmitDataResponsePB; |
| |
| /// Single receiver of an m:n data stream. |
| /// |
| /// KrpcDataStreamRecvr maintains one or more queues of row batches received by a |
| /// KrpcDataStreamMgr from one or more sender fragment instances. Receivers are created |
| /// via KrpcDataStreamMgr::CreateRecvr(). Ownership of a stream recvr is shared between |
| /// the KrpcDataStreamMgr that created it and the caller of |
| /// KrpcDataStreamMgr::CreateRecvr() (i.e. the exchange node). |
| /// |
| /// The is_merging_ member determines if the recvr merges input streams from different |
| /// sender fragment instances according to a specified sort order. |
| /// If is_merging_ is false : Only one batch queue is maintained for row batches from all |
| /// sender fragment instances. These row batches are returned one at a time via GetBatch() |
| /// If is_merging_ is true : One queue is created for the batches from each distinct |
| /// sender. A SortedRunMerger instance must be created via CreateMerger() prior to |
| /// retrieving any rows from the receiver. Rows are retrieved from the receiver via |
| /// GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to |
| /// GetNext(), TransferAllResources() must be called to transfer resources from the input |
| /// batches from each sender to the caller's output batch. |
| /// The receiver sets deep_copy to false on the merger - resources are transferred from |
| /// the input batches from each sender queue to the merger to the output batch by the |
| /// merger itself as it processes each run. |
| /// |
| /// KrpcDataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove |
| /// the recvr instance from the tracking structure of its KrpcDataStreamMgr in all cases. |
| /// |
| /// Unless otherwise stated, class members belong to KrpcDataStreamRecvr. They are safe to |
| /// access from any threads as long as the caller obtained a shared_ptr to keep the |
| /// receiver alive. For class members not owned by the receiver, they must stay valid till |
| /// after Close() is called. Since a receiver is co-owned by an exchange node and the |
| /// singleton KrpcDataStreamMgr, it's possible that certain threads may race with Close() |
| /// called from the fragment execution thread. A receiver may also be cancelled at any |
| /// time due to query cancellation. To avoid resource leak, the following protocol is |
| /// followed: |
| /// - callers must obtain the target sender queue's lock and check if it's cancelled |
| /// - no new row batch or deferred RPCs should be added to a cancelled sender queue |
| /// - Cancel() will drain the deferred RPCs queue and the row batch queue |
| /// |
| class KrpcDataStreamRecvr { |
| public: |
| ~KrpcDataStreamRecvr(); |
| |
| /// Returns next row batch in data stream; blocks if there aren't any. |
| /// Retains ownership of the returned batch. The caller must call TransferAllResources() |
| /// to acquire the resources from the returned batch before the next call to GetBatch(). |
| /// A NULL returned batch indicated eos. Must only be called if is_merging_ is false. |
| /// Called from fragment instance execution threads only. |
| /// TODO: This is currently only exposed to the non-merging version of the exchange. |
| /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos). |
| Status GetBatch(RowBatch** next_batch); |
| |
| /// Deregister from KrpcDataStreamMgr instance, which shares ownership of this instance. |
| /// Called from fragment instance execution threads only. |
| void Close(); |
| |
| /// Create a SortedRunMerger instance to merge rows from multiple sender according to |
| /// the specified row comparator. Fetches the first batches from the individual sender |
| /// queues. The exprs used in less_than must have already been prepared and opened. |
| /// Called from fragment instance execution threads only. |
| Status CreateMerger(const TupleRowComparator& less_than); |
| |
| /// Fill output_batch with the next batch of rows obtained by merging the per-sender |
| /// input streams. Must only be called if is_merging_ is true. Called from fragment |
| /// instance execution threads only. |
| Status GetNext(RowBatch* output_batch, bool* eos); |
| |
| /// Transfer all resources from the current batches being processed from each sender |
| /// queue to the specified batch. Called from fragment instance execution threads only. |
| void TransferAllResources(RowBatch* transfer_batch); |
| |
| /// Marks all sender queues as cancelled and notifies all waiting consumers of |
| /// the cancellation. |
| void CancelStream(); |
| |
| const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; } |
| PlanNodeId dest_node_id() const { return dest_node_id_; } |
| const RowDescriptor* row_desc() const { return row_desc_; } |
| MemTracker* deferred_rpc_tracker() const { return deferred_rpc_tracker_.get(); } |
| MemTracker* parent_tracker() const { return parent_tracker_; } |
| BufferPool::ClientHandle* buffer_pool_client() const { return buffer_pool_client_; } |
| |
| private: |
| friend class KrpcDataStreamMgr; |
| class SenderQueue; |
| |
| KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, MemTracker* parent_tracker, |
| const RowDescriptor* row_desc, const RuntimeState& runtime_state, |
| const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, |
| bool is_merging, int64_t total_buffer_limit, RuntimeProfile* profile, |
| BufferPool::ClientHandle* client); |
| |
| /// Adds a new row batch to the appropriate sender queue. If the row batch can be |
| /// inserted, the RPC will be responded to before this function returns. If the batch |
| /// can't be added without exceeding the buffer limit, it is appended to a queue for |
| /// deferred processing. The RPC will be responded to when the row batch is deserialized |
| /// later. |
| void AddBatch(const TransmitDataRequestPB* request, TransmitDataResponsePB* response, |
| kudu::rpc::RpcContext* context); |
| |
| /// Tries adding the first entry of 'deferred_rpcs_' queue for the sender queue |
| /// identified by 'sender_id'. If is_merging_ is false, it always defaults to |
| /// queue 0; If is_merging_ is true, the sender queue is identified by 'sender_id_'. |
| /// Called from KrpcDataStreamMgr's deserialization threads only. |
| void ProcessDeferredRpc(int sender_id); |
| |
| /// Takes over the RPC state 'ctx' of an early sender for deferred processing and |
| /// kicks off a deserialization task to process it asynchronously. This makes sure |
| /// new incoming RPCs won't pass the early senders, leading to starvation. |
| /// Called from fragment instance execution threads only. |
| void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx); |
| |
| /// Indicate that a particular sender is done. Delegated to the appropriate |
| /// sender queue. Called from KrpcDataStreamMgr. |
| void RemoveSender(int sender_id); |
| |
| /// Return true if the addition of a new batch of size 'batch_size' would exceed the |
| /// total buffer limit. |
| bool ExceedsLimit(int64_t batch_size) { |
| return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_; |
| } |
| |
| /// Return the current number of deferred RPCs. |
| int64_t num_deferred_rpcs() const { return num_deferred_rpcs_.Load(); } |
| |
| /// KrpcDataStreamMgr instance used to create this recvr. Not owned. |
| KrpcDataStreamMgr* mgr_; |
| |
| /// The runtime state of the fragment instance which owns this receiver. Not owned. |
| const RuntimeState& runtime_state_; |
| |
| /// Fragment and node id of the destination exchange node this receiver is used by. |
| TUniqueId fragment_instance_id_; |
| PlanNodeId dest_node_id_; |
| |
| /// Soft upper limit on the total amount of buffering in bytes allowed for this stream |
| /// across all sender queues. We defer processing of incoming RPCs once the amount of |
| /// buffered data exceeds this value. |
| const int64_t total_buffer_limit_; |
| |
| /// Row schema. Not owned. |
| const RowDescriptor* row_desc_; |
| |
| /// True if this reciver merges incoming rows from different senders. Per-sender |
| /// row batch queues are maintained in this case. |
| const bool is_merging_; |
| |
| /// True if Close() has been called on this receiver already. Should only be accessed |
| /// from the fragment execution thread. |
| bool closed_; |
| |
| /// Current number of bytes held across all sender queues. |
| AtomicInt32 num_buffered_bytes_; |
| |
| /// Current number of outstanding deferred RPCs across all sender queues. |
| AtomicInt64 num_deferred_rpcs_; |
| |
| /// Memtracker for payloads of deferred Rpcs in the sender queue(s). This must be |
| /// accessed with a sender queue's lock held to avoid race with Close() of the queue. |
| boost::scoped_ptr<MemTracker> deferred_rpc_tracker_; |
| |
| /// The MemTracker of the exchange node which owns this receiver. Not owned. |
| /// This is the MemTracker which 'client_' below internally references. |
| MemTracker* parent_tracker_; |
| |
| /// The buffer pool client for allocating buffers of incoming row batches. Not owned. |
| BufferPool::ClientHandle* buffer_pool_client_; |
| |
| /// One or more queues of row batches received from senders. If is_merging_ is true, |
| /// there is one SenderQueue for each sender. Otherwise, row batches from all senders |
| /// are placed in the same SenderQueue. The SenderQueue instances are owned by the |
| /// receiver and placed in 'pool_'. |
| std::vector<SenderQueue*> sender_queues_; |
| |
| /// SortedRunMerger used to merge rows from different senders. |
| boost::scoped_ptr<SortedRunMerger> merger_; |
| |
| /// Pool which owns sender queues and the runtime profiles. |
| ObjectPool pool_; |
| |
| /// Runtime profile of the owning exchange node. It's the parent of |
| /// 'dequeue_profile_' and 'enqueue_profile_'. Not owned. |
| RuntimeProfile* profile_; |
| |
| /// Maintain two child profiles - receiver side measurements (from the GetBatch() path), |
| /// and sender side measurements (from AddBatch()). These two profiles own all counters |
| /// below unless otherwise noted. These profiles are owned by the receiver and placed |
| /// in 'pool_'. 'dequeue_profile_' and 'enqueue_profile_' must outlive 'profile_' |
| /// to prevent accessing freed memory during top-down traversal of 'profile_'. The |
| /// receiver is co-owned by the exchange node and the data stream manager so these two |
| /// profiles should outlive the exchange node which owns 'profile_'. |
| RuntimeProfile* dequeue_profile_; |
| RuntimeProfile* enqueue_profile_; |
| |
| /// Pointer to profile's inactive timer. Not owned. |
| /// Not directly shown in the profile and thus data_wait_time_ below. Used for |
| /// subtracting the wait time from the total time spent in exchange node. |
| RuntimeProfile::Counter* inactive_timer_; |
| |
| /// ------------------------------------------------------------------------------------ |
| /// Following counters belong to 'dequeue_profile_'. |
| |
| /// Number of bytes of deserialized row batches dequeued. |
| RuntimeProfile::Counter* bytes_dequeued_counter_; |
| |
| /// Time series of bytes of deserialized row batches, samples 'bytes_dequeued_counter_'. |
| RuntimeProfile::TimeSeriesCounter* bytes_dequeued_time_series_counter_; |
| |
| /// Total wall-clock time spent in SenderQueue::GetBatch(). |
| RuntimeProfile::Counter* queue_get_batch_timer_; |
| |
| /// Total wall-clock time spent waiting for data to be available in queues. |
| RuntimeProfile::Counter* data_wait_timer_; |
| |
| /// Wall-clock time spent waiting for the first batch arrival across all queues. |
| RuntimeProfile::Counter* first_batch_wait_total_timer_; |
| |
| /// ------------------------------------------------------------------------------------ |
| /// Following counters belong to 'enqueue_profile_'. |
| |
| /// Total number of bytes of serialized row batches received. |
| RuntimeProfile::Counter* bytes_received_counter_; |
| |
| /// Time series of number of bytes received, samples 'bytes_received_counter_'. |
| RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_; |
| |
| /// Total wall-clock time spent deserializing row batches. |
| RuntimeProfile::Counter* deserialize_row_batch_timer_; |
| |
| /// Total number of EOS received. |
| RuntimeProfile::Counter* total_eos_received_counter_; |
| |
| /// Total number of senders which arrive before the receiver is ready. |
| RuntimeProfile::Counter* total_early_senders_counter_; |
| |
| /// Total number of serialized row batches received. |
| RuntimeProfile::Counter* total_received_batches_counter_; |
| |
| /// Total number of deserialized row batches enqueued into the row batch queues. |
| RuntimeProfile::Counter* total_enqueued_batches_counter_; |
| |
| /// Total number of RPCs whose responses are deferred because of early senders or |
| /// full row batch queue. |
| RuntimeProfile::Counter* total_deferred_rpcs_counter_; |
| |
| /// Time series of number of deferred row batches, samples 'num_deferred_rpcs_'. |
| RuntimeProfile::TimeSeriesCounter* deferred_rpcs_time_series_counter_; |
| |
| /// Total wall-clock time in which the 'deferred_rpcs_' queues are not empty. |
| RuntimeProfile::Counter* total_has_deferred_rpcs_timer_; |
| |
| /// Summary stats of time which RPCs spent in KRPC service queue before |
| /// being dispatched to the RPC handlers. |
| RuntimeProfile::SummaryStatsCounter* dispatch_timer_; |
| }; |
| |
| } // namespace impala |
| |
| #endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H |