blob: 972925c76367d0ce252635f6bce9ff3c7457354d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/krpc-data-stream-sender.h"
#include <boost/bind.hpp>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <thrift/protocol/TDebugProtocol.h>
#include "common/logging.h"
#include "codegen/codegen-anyval.h"
#include "codegen/llvm-codegen.h"
#include "exec/kudu-util.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "gutil/strings/substitute.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "rpc/rpc-mgr.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "service/data-stream-service.h"
#include "util/aligned-new.h"
#include "util/debug-util.h"
#include "util/network-util.h"
#include "util/pretty-printer.h"
#include "gen-cpp/data_stream_service.pb.h"
#include "gen-cpp/data_stream_service.proxy.h"
#include "gen-cpp/Types_types.h"
#include "common/names.h"
using std::condition_variable_any;
using namespace apache::thrift;
using kudu::rpc::RpcController;
using kudu::rpc::RpcSidecar;
using kudu::MonoDelta;
DECLARE_int64(impala_slow_rpc_threshold_ms);
DECLARE_int32(rpc_retry_interval_ms);
namespace impala {
const char* KrpcDataStreamSender::HASH_ROW_SYMBOL =
"KrpcDataStreamSender7HashRowEPNS_8TupleRowE";
const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStreamSender";
const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
// A datastream sender may send row batches to multiple destinations. There is one
// channel for each destination.
//
// Clients can call TransmitData() to directly send a serialized row batch to the
// destination or it can call AddRow() to accumulate rows in an internal row batch
// to certain capacity before sending it. The underlying RPC layer is implemented
// with KRPC, which provides interfaces for asynchronous RPC calls. Normally, the
// calls above will return before the RPC has completed but they may block if there
// is already an in-flight RPC.
//
// Each channel internally has two OutboundRowBatch to serialize to. They are reused
// across multiple RPC calls. Having two OutboundRowBatch allows client to serialize
// the next row batch while the current row batch is being sent. Upon completion of
// a RPC, the callback TransmitDataCompleteCb() is invoked. If the RPC fails due to
// remote service's queue being full, TransmitDataCompleteCb() will schedule the retry
// callback RetryCb() after some delay dervied from 'FLAGS_rpc_retry_internal_ms'.
//
// When a data stream sender is shut down, it will call Teardown() on all channels to
// release resources. Teardown() will cancel any in-flight RPC and wait for the
// completion callback to be called before returning. It's expected that the execution
// thread to flush all buffered row batches and send the end-of-stream message (by
// calling FlushBatches(), SendEosAsync() and WaitForRpc()) before closing the data
// stream sender.
//
// Note that the RPC payloads are owned solely by the channel and the KRPC layer will
// relinquish references of them before the completion callback is invoked so it's
// safe to free them once the callback has been invoked.
//
// Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client
// has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() should be the
// right place, except that's currently called too early). RpcController::Cancel() ensures
// that the callback is called only after the RPC layer no longer references the sidecar
// buffers.
class KrpcDataStreamSender::Channel : public CacheLineAligned {
public:
// Creates a channel to send data to particular ipaddress/port/fragment instance id/node
// combination. buffer_size is specified in bytes and a soft limit on how much tuple
// data is getting accumulated before being sent; it only applies when data is added via
// AddRow() and not sent directly via SendBatch().
Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc,
const std::string& hostname, const TNetworkAddress& destination,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int buffer_size)
: parent_(parent),
row_desc_(row_desc),
hostname_(hostname),
address_(destination),
fragment_instance_id_(fragment_instance_id),
dest_node_id_(dest_node_id) {
DCHECK(IsResolvedAddress(address_));
}
// Initializes the channel.
// Returns OK if successful, error indication otherwise.
Status Init(RuntimeState* state);
// Serializes the given row batch and send it to the destination. If the preceding
// RPC is in progress, this function may block until the previous RPC finishes.
// Return error status if serialization or the preceding RPC failed. Return OK
// otherwise.
Status SerializeAndSendBatch(RowBatch* batch);
// Transmits the serialized row batch 'outbound_batch'. This function may block if the
// preceding RPC is still in-flight. This is expected to be called from the fragment
// instance execution thread. Return error status if initialization of the RPC request
// parameters failed or if the preceding RPC failed. Returns OK otherwise.
Status TransmitData(const OutboundRowBatch* outbound_batch);
// Copies a single row into this channel's row batch and flushes the row batch once
// it reaches capacity. This call may block if the row batch's capacity is reached
// and the preceding RPC is still in progress. Returns error status if serialization
// failed or if the preceding RPC failed. Return OK otherwise.
Status ALWAYS_INLINE AddRow(TupleRow* row);
// Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will
// be cancelled. It's expected that clients normally call FlushBatches(), SendEosAsync()
// and WaitForRpc() before calling Teardown() to flush all buffered row batches to
// destinations. Teardown() may be called without flushing the channel in cases such
// as cancellation or error.
void Teardown(RuntimeState* state);
// Flushes any buffered row batches. Return error status if the TransmitData() RPC
// fails. The RPC is sent asynchrononously. WaitForRpc() must be called to wait
// for the RPC. This should be only called from a fragment executor thread.
Status FlushBatches();
// Sends the EOS RPC to close the channel. Return error status if sending the EOS RPC
// failed. The RPC is sent asynchrononously. WaitForRpc() must be called to wait for
// the RPC. This should be only called from a fragment executor thread.
Status SendEosAsync();
// Waits for the preceding RPC to complete. Return error status if the preceding
// RPC fails. Returns CANCELLED if the parent sender is cancelled or shut down.
// Returns OK otherwise. This should be only called from a fragment executor thread.
Status WaitForRpc();
// The type for a RPC worker function.
typedef boost::function<Status()> DoRpcFn;
private:
// The parent data stream sender owning this channel. Not owned.
KrpcDataStreamSender* parent_;
// The descriptor of the accumulated rows in 'batch_' below. Used for computing
// the capacity of 'batch_' and also when adding a row in AddRow().
const RowDescriptor* row_desc_;
// The triplet of IP-address:port/finst-id/node-id uniquely identifies the receiver.
const std::string hostname_;
const TNetworkAddress address_;
const TUniqueId fragment_instance_id_;
const PlanNodeId dest_node_id_;
// The row batch for accumulating rows copied from AddRow().
// Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
scoped_ptr<RowBatch> batch_;
// The outbound row batches are double-buffered so that we can serialize the next
// batch while the other is still referenced by the in-flight RPC. Each entry contains
// a RowBatchHeaderPB and the buffers for the serialized tuple offsets and data.
//
// TODO: replace this with an actual queue. Schedule another RPC callback in the
// completion callback if the queue is not empty.
// TODO: rethink whether to keep per-channel buffers vs having all buffers in the
// datastream sender and sharing them across all channels. These buffers are not used in
// "UNPARTITIONED" scheme.
OutboundRowBatch outbound_batches_[NUM_OUTBOUND_BATCHES];
// Index into 'outbound_batches_' for the next available OutboundRowBatch to serialize
// into. This is read and written by the main execution thread.
int next_batch_idx_ = 0;
// Synchronize accesses to the following fields between the main execution thread and
// the KRPC reactor thread. Note that there should be only one reactor thread invoking
// the callbacks for a channel so there should be no races between multiple reactor
// threads. Protect all subsequent fields.
SpinLock lock_;
// 'lock_' needs to be held when accessing the following fields.
// The client interface for making RPC calls to the remote DataStreamService.
std::unique_ptr<DataStreamServiceProxy> proxy_;
// Controller for managing properties of a single RPC call (such as features required
// in the remote servers) and passing the payloads to the actual OutboundCall object.
RpcController rpc_controller_;
// Protobuf response buffer for TransmitData() RPC.
TransmitDataResponsePB resp_;
// Protobuf response buffer for EndDataStream() RPC.
EndDataStreamResponsePB eos_resp_;
// Signaled when the in-flight RPC completes.
condition_variable_any rpc_done_cv_;
// Status of the most recently completed RPC.
Status rpc_status_;
// The pointer to the current serialized row batch being sent.
const OutboundRowBatch* rpc_in_flight_batch_ = nullptr;
// The monotonic time in nanoseconds of when current RPC started.
int64_t rpc_start_time_ns_ = 0;
// True if there is an in-flight RPC.
bool rpc_in_flight_ = false;
// True if the channel is being shut down or shut down already.
bool shutdown_ = false;
// True if the remote receiver is closed already. In which case, all rows would
// be dropped silently.
// TODO: Fix IMPALA-3990
bool remote_recvr_closed_ = false;
// Returns true if the channel should terminate because the parent sender
// has been closed or cancelled.
bool ShouldTerminate() const { return shutdown_ || parent_->state_->is_cancelled(); }
// Send the rows accumulated in the internal row batch. This will serialize the
// internal row batch before sending them to the destination. This may block if
// the preceding RPC is still in progress. Returns error status if serialization
// fails or if the preceding RPC fails.
Status SendCurrentBatch();
// Called when an RPC failed. If it turns out that the RPC failed because the
// remote server is too busy, this function will schedule RetryCb() to be called
// after FLAGS_rpc_retry_interval_ms milliseconds, which in turn re-invokes the RPC.
// Otherwise, it will call MarkDone() to mark the RPC as done and failed.
// 'controller_status' is a Kudu status returned from the KRPC layer.
// 'rpc_fn' is a worker function which initializes the RPC parameters and invokes
// the actual RPC when the RPC is rescheduled.
// 'err_msg' is an error message to be prepended to the status converted from the
// Kudu status 'controller_status'.
void HandleFailedRPC(const DoRpcFn& rpc_fn, const kudu::Status& controller_status,
const string& err_msg);
// Same as WaitForRpc() except expects to be called with 'lock_' held and
// may drop the lock while waiting for the RPC to complete.
Status WaitForRpcLocked(std::unique_lock<SpinLock>* lock);
// A callback function called from KRPC reactor thread to retry an RPC which failed
// previously due to remote server being too busy. This will re-arm the request
// parameters of the RPC. The retry may not happen if the callback has been aborted
// internally by KRPC code (e.g. the reactor thread was being shut down) or if the
// parent sender has been cancelled or closed since the scheduling of this callback.
// In which case, MarkDone() will be called with the error status and the RPC is
// considered complete. 'status' is the error status passed by KRPC code in case the
// callback was aborted.
void RetryCb(DoRpcFn rpc_fn, const kudu::Status& status);
// A callback function called from KRPC reactor threads upon completion of an in-flight
// TransmitData() RPC. This is called when the remote server responds to the RPC or
// when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a
// successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the
// response. HandleFailedRPC() is called to handle failed KRPC call. The RPC may be
// rescheduled if it's due to remote server being too busy.
void TransmitDataCompleteCb();
// Initializes the parameters for TransmitData() RPC and invokes the async RPC call.
// It will add 'tuple_offsets_' and 'tuple_data_' in 'rpc_in_flight_batch_' as sidecars
// to the RpcController and store the sidecars' indices to TransmitDataRequestPB sent as
// part of the RPC. Returns error status if adding sidecars to the RpcController failed.
Status DoTransmitDataRpc();
// A callback function called from KRPC reactor threads upon completion of an in-flight
// EndDataStream() RPC. This is called when the remote server responds to the RPC or
// when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a
// successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the
// response. HandleFailedRPC() is called to handle failed KRPC calls. The RPC may be
// rescheduled if it's due to remote server being too busy.
void EndDataStreamCompleteCb();
// Initializes the parameters for EndDataStream() RPC and invokes the async RPC call.
Status DoEndDataStreamRpc();
// Marks the in-flight RPC as completed, updates 'rpc_status_' with the status of the
// RPC (indicated in parameter 'status') and notifies any thread waiting for RPC
// completion. Expects to be called with 'lock_' held. Called in the context of a
// reactor thread.
void MarkDone(const Status& status);
// Return true if the RPC exceeds the slow RPC threshold and should be logged.
inline bool IsSlowRpc(int64_t total_time_ns) {
int64_t total_time_ms = total_time_ns / NANOS_PER_MICRO / MICROS_PER_MILLI;
return total_time_ms > FLAGS_impala_slow_rpc_threshold_ms;
}
// Logs a slow RPC that took 'total_time_ns'. resp.receiver_latency_ns() is used to
// distinguish processing time on the receiver from network time.
template <typename ResponsePBType>
void LogSlowRpc(
const char* rpc_name, int64_t total_time_ns, const ResponsePBType& resp);
// Logs a slow RPC that took 'total_time_ns' and failed with 'error'.
void LogSlowFailedRpc(
const char* rpc_name, int64_t total_time_ns, const kudu::Status& err);
};
Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
// TODO: take into account of var-len data at runtime.
int capacity =
max(1, parent_->per_channel_buffer_size_ / max(row_desc_->GetRowSize(), 1));
batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
// Create a DataStreamService proxy to the destination.
RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_));
return Status::OK();
}
void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
if (UNLIKELY(!status.ok())) COUNTER_ADD(parent_->rpc_failure_counter_, 1);
rpc_status_ = status;
rpc_in_flight_ = false;
rpc_in_flight_batch_ = nullptr;
rpc_done_cv_.notify_one();
rpc_start_time_ns_ = 0;
}
template <typename ResponsePBType>
void KrpcDataStreamSender::Channel::LogSlowRpc(
const char* rpc_name, int64_t total_time_ns, const ResponsePBType& resp) {
int64_t network_time_ns = total_time_ns - resp_.receiver_latency_ns();
LOG(INFO) << "Slow " << rpc_name << " RPC to " << TNetworkAddressToString(address_)
<< " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
<< "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) << ". "
<< "Receiver time: "
<< PrettyPrinter::Print(resp_.receiver_latency_ns(), TUnit::TIME_NS)
<< " Network time: " << PrettyPrinter::Print(network_time_ns, TUnit::TIME_NS);
}
void KrpcDataStreamSender::Channel::LogSlowFailedRpc(
const char* rpc_name, int64_t total_time_ns, const kudu::Status& err) {
LOG(INFO) << "Slow " << rpc_name << " RPC to " << TNetworkAddressToString(address_)
<< " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
<< "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) << ". "
<< "Error: " << err.ToString();
}
Status KrpcDataStreamSender::Channel::WaitForRpc() {
std::unique_lock<SpinLock> l(lock_);
return WaitForRpcLocked(&l);
}
Status KrpcDataStreamSender::Channel::WaitForRpcLocked(std::unique_lock<SpinLock>* lock) {
DCHECK(lock != nullptr);
DCHECK(lock->owns_lock());
ScopedTimer<MonotonicStopWatch> timer(parent_->profile()->inactive_timer(),
parent_->state_->total_network_send_timer());
// Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled.
while(rpc_in_flight_ && !ShouldTerminate()) {
rpc_done_cv_.wait_for(*lock, std::chrono::milliseconds(50));
}
int64_t elapsed_time_ns = timer.ElapsedTime();
if (IsSlowRpc(elapsed_time_ns)) {
LOG(INFO) << "Long delay waiting for RPC to " << TNetworkAddressToString(address_)
<< " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
<< "took " << PrettyPrinter::Print(elapsed_time_ns, TUnit::TIME_NS);
}
if (UNLIKELY(ShouldTerminate())) {
// DSS is single-threaded so it's impossible for shutdown_ to be true here.
DCHECK(!shutdown_);
return Status::CANCELLED;
}
DCHECK(!rpc_in_flight_);
if (UNLIKELY(!rpc_status_.ok())) {
LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " failed: "
<< "(fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
<< rpc_status_.GetDetail();
return rpc_status_;
}
return Status::OK();
}
void KrpcDataStreamSender::Channel::RetryCb(
DoRpcFn rpc_fn, const kudu::Status& cb_status) {
COUNTER_ADD(parent_->rpc_retry_counter_, 1);
std::unique_lock<SpinLock> l(lock_);
DCHECK(rpc_in_flight_);
// Aborted by KRPC layer as reactor thread was being shut down.
if (UNLIKELY(!cb_status.ok())) {
MarkDone(FromKuduStatus(cb_status, "KRPC retry failed"));
return;
}
// Parent datastream sender has been closed or cancelled.
if (UNLIKELY(ShouldTerminate())) {
MarkDone(Status::CANCELLED);
return;
}
// Retry the RPC.
Status status = rpc_fn();
if (UNLIKELY(!status.ok())) {
MarkDone(status);
}
}
void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
const kudu::Status& controller_status, const string& prepend) {
// Retrying later if the destination is busy. We don't call ShouldTerminate()
// here as this is always checked in RetryCb() anyway.
// TODO: IMPALA-6159. Handle 'connection reset by peer' due to stale connections.
if (RpcMgr::IsServerTooBusy(rpc_controller_)) {
RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
// RetryCb() is scheduled to be called in a reactor context.
rpc_mgr->messenger()->ScheduleOnReactor(
boost::bind(&KrpcDataStreamSender::Channel::RetryCb, this, rpc_fn, _1),
MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms));
return;
}
MarkDone(FromKuduStatus(controller_status, prepend));
}
void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
DCHECK_NE(rpc_start_time_ns_, 0);
int64_t total_time = MonotonicNanos() - rpc_start_time_ns_;
std::unique_lock<SpinLock> l(lock_);
DCHECK(rpc_in_flight_);
const kudu::Status controller_status = rpc_controller_.status();
if (LIKELY(controller_status.ok())) {
DCHECK(rpc_in_flight_batch_ != nullptr);
int64_t row_batch_size = RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
int64_t network_time = total_time - resp_.receiver_latency_ns();
COUNTER_ADD(parent_->bytes_sent_counter_, row_batch_size);
if (LIKELY(network_time > 0)) {
// 'row_batch_size' is bounded by FLAGS_rpc_max_message_size which shouldn't exceed
// max 32-bit signed value so multiplication below should not overflow.
DCHECK_LE(row_batch_size, numeric_limits<int32_t>::max());
int64_t network_throughput = row_batch_size * NANOS_PER_SEC / network_time;
parent_->network_throughput_counter_->UpdateCounter(network_throughput);
parent_->network_time_stats_->UpdateCounter(network_time);
}
parent_->recvr_time_stats_->UpdateCounter(resp_.receiver_latency_ns());
if (IsSlowRpc(total_time)) LogSlowRpc("TransmitData", total_time, resp_);
Status rpc_status = Status::OK();
int32_t status_code = resp_.status().status_code();
if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
remote_recvr_closed_ = true;
} else {
rpc_status = Status(resp_.status());
}
MarkDone(rpc_status);
} else {
if (IsSlowRpc(total_time)) {
LogSlowFailedRpc("TransmitData", total_time, controller_status);
}
DoRpcFn rpc_fn =
boost::bind(&KrpcDataStreamSender::Channel::DoTransmitDataRpc, this);
const string& prepend =
Substitute("TransmitData() to $0 failed", TNetworkAddressToString(address_));
HandleFailedRPC(rpc_fn, controller_status, prepend);
}
}
Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
DCHECK(rpc_in_flight_batch_ != nullptr);
DCHECK(rpc_in_flight_batch_->IsInitialized());
// Initialize some constant fields in the request protobuf.
TransmitDataRequestPB req;
UniqueIdPB* finstance_id_pb = req.mutable_dest_fragment_instance_id();
finstance_id_pb->set_lo(fragment_instance_id_.lo);
finstance_id_pb->set_hi(fragment_instance_id_.hi);
req.set_sender_id(parent_->sender_id_);
req.set_dest_node_id(dest_node_id_);
// Set the RowBatchHeader in the request.
req.set_allocated_row_batch_header(
const_cast<RowBatchHeaderPB*>(rpc_in_flight_batch_->header()));
rpc_controller_.Reset();
int sidecar_idx;
// Add 'tuple_offsets_' as sidecar.
KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(RpcSidecar::FromSlice(
rpc_in_flight_batch_->TupleOffsetsAsSlice()), &sidecar_idx),
"Unable to add tuple offsets to sidecar");
req.set_tuple_offsets_sidecar_idx(sidecar_idx);
// Add 'tuple_data_' as sidecar.
rpc_start_time_ns_ = MonotonicNanos();
KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(
RpcSidecar::FromSlice(rpc_in_flight_batch_->TupleDataAsSlice()), &sidecar_idx),
"Unable to add tuple data to sidecar");
req.set_tuple_data_sidecar_idx(sidecar_idx);
resp_.Clear();
proxy_->TransmitDataAsync(req, &resp_, &rpc_controller_,
boost::bind(&KrpcDataStreamSender::Channel::TransmitDataCompleteCb, this));
// 'req' took ownership of 'header'. Need to release its ownership or 'header' will be
// deleted by destructor.
req.release_row_batch_header();
return Status::OK();
}
Status KrpcDataStreamSender::Channel::TransmitData(
const OutboundRowBatch* outbound_batch) {
VLOG_ROW << "Channel::TransmitData() fragment_instance_id="
<< PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
<< " #rows=" << outbound_batch->header()->num_rows();
std::unique_lock<SpinLock> l(lock_);
RETURN_IF_ERROR(WaitForRpcLocked(&l));
DCHECK(!rpc_in_flight_);
DCHECK(rpc_in_flight_batch_ == nullptr);
// If the remote receiver is closed already, there is no point in sending anything.
// TODO: Needs better solution for IMPALA-3990 in the long run.
if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
rpc_in_flight_ = true;
rpc_in_flight_batch_ = outbound_batch;
RETURN_IF_ERROR(DoTransmitDataRpc());
return Status::OK();
}
Status KrpcDataStreamSender::Channel::SerializeAndSendBatch(RowBatch* batch) {
OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
DCHECK(outbound_batch != rpc_in_flight_batch_);
RETURN_IF_ERROR(parent_->SerializeBatch(batch, outbound_batch));
RETURN_IF_ERROR(TransmitData(outbound_batch));
next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
return Status::OK();
}
Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
RETURN_IF_ERROR(SerializeAndSendBatch(batch_.get()));
batch_->Reset();
return Status::OK();
}
inline Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
if (batch_->AtCapacity()) {
// batch_ is full, let's send it.
RETURN_IF_ERROR(SendCurrentBatch());
}
TupleRow* dest = batch_->GetRow(batch_->AddRow());
const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors();
for (int i = 0; i < descs.size(); ++i) {
if (UNLIKELY(row->GetTuple(i) == nullptr)) {
dest->SetTuple(i, nullptr);
} else {
dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i], batch_->tuple_data_pool()));
}
}
batch_->CommitLastRow();
return Status::OK();
}
void KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() {
std::unique_lock<SpinLock> l(lock_);
DCHECK(rpc_in_flight_);
DCHECK_NE(rpc_start_time_ns_, 0);
int64_t total_time_ns = MonotonicNanos() - rpc_start_time_ns_;
const kudu::Status controller_status = rpc_controller_.status();
if (LIKELY(controller_status.ok())) {
int64_t network_time_ns = total_time_ns - resp_.receiver_latency_ns();
parent_->network_time_stats_->UpdateCounter(network_time_ns);
parent_->recvr_time_stats_->UpdateCounter(eos_resp_.receiver_latency_ns());
if (IsSlowRpc(total_time_ns)) LogSlowRpc("EndDataStream", total_time_ns, eos_resp_);
MarkDone(Status(eos_resp_.status()));
} else {
if (IsSlowRpc(total_time_ns)) {
LogSlowFailedRpc("EndDataStream", total_time_ns, controller_status);
}
DoRpcFn rpc_fn =
boost::bind(&KrpcDataStreamSender::Channel::DoEndDataStreamRpc, this);
const string& prepend =
Substitute("EndDataStream() to $0 failed", TNetworkAddressToString(address_));
HandleFailedRPC(rpc_fn, controller_status, prepend);
}
}
Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() {
DCHECK(rpc_in_flight_);
EndDataStreamRequestPB eos_req;
rpc_controller_.Reset();
UniqueIdPB* finstance_id_pb = eos_req.mutable_dest_fragment_instance_id();
finstance_id_pb->set_lo(fragment_instance_id_.lo);
finstance_id_pb->set_hi(fragment_instance_id_.hi);
eos_req.set_sender_id(parent_->sender_id_);
eos_req.set_dest_node_id(dest_node_id_);
eos_resp_.Clear();
rpc_start_time_ns_ = MonotonicNanos();
proxy_->EndDataStreamAsync(eos_req, &eos_resp_, &rpc_controller_,
boost::bind(&KrpcDataStreamSender::Channel::EndDataStreamCompleteCb, this));
return Status::OK();
}
Status KrpcDataStreamSender::Channel::FlushBatches() {
VLOG_RPC << "Channel::FlushBatches() fragment_instance_id="
<< PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
<< " #rows= " << batch_->num_rows();
// We can return an error here and not go on to send the EOS RPC because the error that
// we returned will be sent to the coordinator who will then cancel all the remote
// fragments including the one that this sender is sending to.
if (batch_->num_rows() > 0) RETURN_IF_ERROR(SendCurrentBatch());
return Status::OK();
}
Status KrpcDataStreamSender::Channel::SendEosAsync() {
VLOG_RPC << "Channel::SendEosAsync() fragment_instance_id="
<< PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
<< " #rows= " << batch_->num_rows();
DCHECK_EQ(0, batch_->num_rows()) << "Batches must be flushed";
{
std::unique_lock<SpinLock> l(lock_);
DCHECK(!rpc_in_flight_);
DCHECK(rpc_status_.ok());
if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
VLOG_RPC << "calling EndDataStream() to terminate channel. fragment_instance_id="
<< PrintId(fragment_instance_id_);
rpc_in_flight_ = true;
COUNTER_ADD(parent_->eos_sent_counter_, 1);
RETURN_IF_ERROR(DoEndDataStreamRpc());
}
return Status::OK();
}
void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) {
// Normally, the channel should have been flushed before calling Teardown(), which means
// that all the data should already be drained. If the fragment was was closed or
// cancelled, there may still be some in-flight RPCs and buffered row batches to be
// flushed.
std::unique_lock<SpinLock> l(lock_);
shutdown_ = true;
// Cancel any in-flight RPC.
if (rpc_in_flight_) {
rpc_controller_.Cancel();
while (rpc_in_flight_) rpc_done_cv_.wait(l);
}
batch_.reset();
}
KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
const RowDescriptor* row_desc, const TDataStreamSink& sink,
const vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size,
RuntimeState* state)
: DataSink(sink_id, row_desc,
Substitute("KrpcDataStreamSender (dst_id=$0)", sink.dest_node_id), state),
sender_id_(sender_id),
partition_type_(sink.output_partition.type),
per_channel_buffer_size_(per_channel_buffer_size),
dest_node_id_(sink.dest_node_id),
next_unknown_partition_(0) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
|| sink.output_partition.type == TPartitionType::HASH_PARTITIONED
|| sink.output_partition.type == TPartitionType::RANDOM
|| sink.output_partition.type == TPartitionType::KUDU);
for (int i = 0; i < destinations.size(); ++i) {
channels_.push_back(
new Channel(this, row_desc, destinations[i].thrift_backend.hostname,
destinations[i].krpc_backend, destinations[i].fragment_instance_id,
sink.dest_node_id, per_channel_buffer_size));
}
if (partition_type_ == TPartitionType::UNPARTITIONED ||
partition_type_ == TPartitionType::RANDOM) {
// Randomize the order we open/transmit to channels to avoid thundering herd problems.
random_shuffle(channels_.begin(), channels_.end());
}
}
KrpcDataStreamSender::~KrpcDataStreamSender() {
// TODO: check that sender was either already closed() or there was an error
// on some channel
for (int i = 0; i < channels_.size(); ++i) {
delete channels_[i];
}
}
Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
const TDataSink& tsink, RuntimeState* state) {
SCOPED_TIMER(profile_->total_time_counter());
DCHECK(tsink.__isset.stream_sink);
if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
partition_type_ == TPartitionType::KUDU) {
RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
*row_desc_, state, &partition_exprs_));
}
return Status::OK();
}
Status KrpcDataStreamSender::Prepare(
RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
state_ = state;
SCOPED_TIMER(profile_->total_time_counter());
RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state,
state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
&partition_expr_evals_));
serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
bytes_sent_counter_ = ADD_COUNTER(profile(), "TotalBytesSent", TUnit::BYTES);
state->AddBytesSentCounter(bytes_sent_counter_);
bytes_sent_time_series_counter_ =
ADD_TIME_SERIES_COUNTER(profile(), "BytesSent", bytes_sent_counter_);
network_throughput_counter_ =
ADD_SUMMARY_STATS_COUNTER(profile(), "NetworkThroughput", TUnit::BYTES_PER_SECOND);
network_time_stats_ =
ADD_SUMMARY_STATS_COUNTER(profile(), "RpcNetworkTime", TUnit::TIME_NS);
recvr_time_stats_ =
ADD_SUMMARY_STATS_COUNTER(profile(), "RpcRecvrTime", TUnit::TIME_NS);
eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
uncompressed_bytes_counter_ =
ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsSent", TUnit::UNIT);
for (int i = 0; i < channels_.size(); ++i) {
RETURN_IF_ERROR(channels_[i]->Init(state));
}
state->CheckAndAddCodegenDisabledMessage(profile());
return Status::OK();
}
Status KrpcDataStreamSender::Open(RuntimeState* state) {
SCOPED_TIMER(profile_->total_time_counter());
return ScalarExprEvaluator::Open(partition_expr_evals_, state);
}
//
// An example of generated code with int type.
//
// define i64 @KrpcDataStreamSenderHashRow(%"class.impala::KrpcDataStreamSender"* %this,
// %"class.impala::TupleRow"* %row) #46 {
// entry:
// %0 = alloca i32
// %1 = call %"class.impala::ScalarExprEvaluator"*
// @_ZN6impala20KrpcDataStreamSender25GetPartitionExprEvaluatorEi(
// %"class.impala::KrpcDataStreamSender"* %this, i32 0)
// %partition_val = call i64 @GetSlotRef(
// %"class.impala::ScalarExprEvaluator"* %1, %"class.impala::TupleRow"* %row)
// %is_null = trunc i64 %partition_val to i1
// br i1 %is_null, label %is_null_block, label %not_null_block
//
// is_null_block: ; preds = %entry
// br label %hash_val_block
//
// not_null_block: ; preds = %entry
// %2 = ashr i64 %partition_val, 32
// %3 = trunc i64 %2 to i32
// store i32 %3, i32* %0
// %native_ptr = bitcast i32* %0 to i8*
// br label %hash_val_block
//
// hash_val_block: ; preds = %not_null_block, %is_null_block
// %val_ptr_phi = phi i8* [ %native_ptr, %not_null_block ], [ null, %is_null_block ]
// %hash_val = call i64
// @_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm(
// i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg,
// i64 7403188670037225271)
// ret i64 %hash_val
// }
Status KrpcDataStreamSender::CodegenHashRow(LlvmCodeGen* codegen, llvm::Function** fn) {
llvm::LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
LlvmCodeGen::FnPrototype prototype(
codegen, "KrpcDataStreamSenderHashRow", codegen->i64_type());
prototype.AddArgument(
LlvmCodeGen::NamedVariable("this", codegen->GetNamedPtrType(LLVM_CLASS_NAME)));
prototype.AddArgument(
LlvmCodeGen::NamedVariable("row", codegen->GetStructPtrType<TupleRow>()));
llvm::Value* args[2];
llvm::Function* hash_row_fn = prototype.GeneratePrototype(&builder, args);
llvm::Value* this_arg = args[0];
llvm::Value* row_arg = args[1];
// Store the initial seed to hash_val
llvm::Value* hash_val = codegen->GetI64Constant(EXCHANGE_HASH_SEED);
// Unroll the loop and codegen each of the partition expressions
for (int i = 0; i < partition_exprs_.size(); ++i) {
llvm::Function* compute_fn;
RETURN_IF_ERROR(
partition_exprs_[i]->GetCodegendComputeFn(codegen, false, &compute_fn));
// Load the expression evaluator for the i-th partition expression
llvm::Function* get_expr_eval_fn =
codegen->GetFunction(IRFunction::KRPC_DSS_GET_PART_EXPR_EVAL, false);
DCHECK(get_expr_eval_fn != nullptr);
llvm::Value* expr_eval_arg =
builder.CreateCall(get_expr_eval_fn, {this_arg, codegen->GetI32Constant(i)});
// Compute the value against the i-th partition expression
llvm::Value* compute_fn_args[] = {expr_eval_arg, row_arg};
CodegenAnyVal partition_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
partition_exprs_[i]->type(), compute_fn, compute_fn_args, "partition_val");
llvm::BasicBlock* is_null_block =
llvm::BasicBlock::Create(context, "is_null_block", hash_row_fn);
llvm::BasicBlock* not_null_block =
llvm::BasicBlock::Create(context, "not_null_block", hash_row_fn);
llvm::BasicBlock* hash_val_block =
llvm::BasicBlock::Create(context, "hash_val_block", hash_row_fn);
// Check if 'partition_val' is NULL
llvm::Value* val_is_null = partition_val.GetIsNull();
builder.CreateCondBr(val_is_null, is_null_block, not_null_block);
// Set the pointer to NULL in case 'partition_val' evaluates to NULL
builder.SetInsertPoint(is_null_block);
llvm::Value* null_ptr = codegen->null_ptr_value();
builder.CreateBr(hash_val_block);
// Saves 'partition_val' on the stack and passes a pointer to it to the hash function
builder.SetInsertPoint(not_null_block);
llvm::Value* native_ptr = partition_val.ToNativePtr();
native_ptr = builder.CreatePointerCast(native_ptr, codegen->ptr_type(), "native_ptr");
builder.CreateBr(hash_val_block);
// Picks the input value to hash function
builder.SetInsertPoint(hash_val_block);
llvm::PHINode* val_ptr_phi = builder.CreatePHI(codegen->ptr_type(), 2, "val_ptr_phi");
val_ptr_phi->addIncoming(native_ptr, not_null_block);
val_ptr_phi->addIncoming(null_ptr, is_null_block);
// Creates a global constant of the partition expression's ColumnType. It has to be a
// constant for constant propagation and dead code elimination in 'get_hash_value_fn'
llvm::Type* col_type = codegen->GetStructType<ColumnType>();
llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
col_type, partition_exprs_[i]->type().ToIR(codegen), "expr_type_arg");
// Update 'hash_val' with the new 'partition-val'
llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, hash_val};
llvm::Function* get_hash_value_fn =
codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE_FAST_HASH, false);
DCHECK(get_hash_value_fn != nullptr);
hash_val = builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_val");
}
builder.CreateRet(hash_val);
*fn = codegen->FinalizeFunction(hash_row_fn);
if (*fn == nullptr) {
return Status("Codegen'd KrpcDataStreamSenderHashRow() fails verification. See log");
}
return Status::OK();
}
string KrpcDataStreamSender::PartitionTypeName() const {
switch (partition_type_) {
case TPartitionType::UNPARTITIONED:
return "Unpartitioned";
case TPartitionType::HASH_PARTITIONED:
return "Hash Partitioned";
case TPartitionType::RANDOM:
return "Random Partitioned";
case TPartitionType::KUDU:
return "Kudu Partitioned";
default:
DCHECK(false) << partition_type_;
return "";
}
}
void KrpcDataStreamSender::Codegen(LlvmCodeGen* codegen) {
const string sender_name = PartitionTypeName() + " Sender";
if (partition_type_ != TPartitionType::HASH_PARTITIONED) {
const string& msg = Substitute("not $0",
partition_type_ == TPartitionType::KUDU ? "supported" : "needed");
profile()->AddCodegenMsg(false, msg, sender_name);
return;
}
llvm::Function* hash_row_fn;
Status codegen_status = CodegenHashRow(codegen, &hash_row_fn);
if (codegen_status.ok()) {
llvm::Function* hash_and_add_rows_fn =
codegen->GetFunction(IRFunction::KRPC_DSS_HASH_AND_ADD_ROWS, true);
DCHECK(hash_and_add_rows_fn != nullptr);
int num_replaced;
// Replace GetNumChannels() with a constant.
num_replaced = codegen->ReplaceCallSitesWithValue(hash_and_add_rows_fn,
codegen->GetI32Constant(GetNumChannels()), "GetNumChannels");
DCHECK_EQ(num_replaced, 1);
// Replace HashRow() with the handcrafted IR function.
num_replaced = codegen->ReplaceCallSites(hash_and_add_rows_fn,
hash_row_fn, HASH_ROW_SYMBOL);
DCHECK_EQ(num_replaced, 1);
hash_and_add_rows_fn = codegen->FinalizeFunction(hash_and_add_rows_fn);
if (hash_and_add_rows_fn == nullptr) {
codegen_status =
Status("Codegen'd HashAndAddRows() failed verification. See log");
} else {
codegen->AddFunctionToJit(hash_and_add_rows_fn,
reinterpret_cast<void**>(&hash_and_add_rows_fn_));
}
}
profile()->AddCodegenMsg(codegen_status.ok(), codegen_status, sender_name);
}
Status KrpcDataStreamSender::AddRowToChannel(const int channel_id, TupleRow* row) {
return channels_[channel_id]->AddRow(row);
}
uint64_t KrpcDataStreamSender::HashRow(TupleRow* row) {
uint64_t hash_val = EXCHANGE_HASH_SEED;
for (ScalarExprEvaluator* eval : partition_expr_evals_) {
void* partition_val = eval->GetValue(row);
// We can't use the crc hash function here because it does not result in
// uncorrelated hashes with different seeds. Instead we use FastHash.
// TODO: fix crc hash/GetHashValue()
hash_val = RawValue::GetHashValueFastHash(
partition_val, eval->root().type(), hash_val);
}
return hash_val;
}
Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(profile()->total_time_counter());
DCHECK(!closed_);
DCHECK(!flushed_);
if (batch->num_rows() == 0) return Status::OK();
if (partition_type_ == TPartitionType::UNPARTITIONED) {
OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch, channels_.size()));
// TransmitData() will block if there are still in-flight rpcs (and those will
// reference the previously written serialized batch).
for (int i = 0; i < channels_.size(); ++i) {
RETURN_IF_ERROR(channels_[i]->TransmitData(outbound_batch));
}
next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
} else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 1) {
// Round-robin batches among channels. Wait for the current channel to finish its
// rpc before overwriting its batch.
Channel* current_channel = channels_[current_channel_idx_];
RETURN_IF_ERROR(current_channel->SerializeAndSendBatch(batch));
current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
} else if (partition_type_ == TPartitionType::KUDU) {
DCHECK_EQ(partition_expr_evals_.size(), 1);
int num_channels = channels_.size();
const int num_rows = batch->num_rows();
const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
int channel_ids[hash_batch_size];
for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
int batch_window_size = min(num_rows - batch_start, hash_batch_size);
for (int i = 0; i < batch_window_size; ++i) {
TupleRow* row = batch->GetRow(i + batch_start);
int32_t partition =
*reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
if (partition < 0) {
// This row doesn't correspond to a partition,
// e.g. it's outside the given ranges.
partition = next_unknown_partition_;
++next_unknown_partition_;
}
channel_ids[i] = partition % num_channels;
}
for (int i = 0; i < batch_window_size; ++i) {
TupleRow* row = batch->GetRow(i + batch_start);
RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
}
}
} else {
DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
if (hash_and_add_rows_fn_ != nullptr) {
RETURN_IF_ERROR(hash_and_add_rows_fn_(this, batch));
} else {
RETURN_IF_ERROR(HashAndAddRows(batch));
}
}
COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
expr_results_pool_->Clear();
RETURN_IF_ERROR(state->CheckQueryState());
return Status::OK();
}
Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
DCHECK(!flushed_);
DCHECK(!closed_);
flushed_ = true;
// Send out the final row batches and EOS signals on all channels in parallel.
// If we hit an error here, we can return without closing the remaining channels as
// the error is propagated back to the coordinator, which in turn cancels the query,
// which will cause the remaining open channels to be closed.
for (Channel* channel : channels_) {
RETURN_IF_ERROR(channel->FlushBatches());
}
for (Channel* channel : channels_) {
RETURN_IF_ERROR(channel->WaitForRpc());
}
for (Channel* channel : channels_) {
RETURN_IF_ERROR(channel->SendEosAsync());
}
for (Channel* channel : channels_) {
RETURN_IF_ERROR(channel->WaitForRpc());
}
return Status::OK();
}
void KrpcDataStreamSender::Close(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
if (closed_) return;
for (int i = 0; i < channels_.size(); ++i) {
channels_[i]->Teardown(state);
}
ScalarExprEvaluator::Close(partition_expr_evals_, state);
ScalarExpr::Close(partition_exprs_);
profile()->StopPeriodicCounters();
DataSink::Close(state);
}
Status KrpcDataStreamSender::SerializeBatch(
RowBatch* src, OutboundRowBatch* dest, int num_receivers) {
VLOG_ROW << "serializing " << src->num_rows() << " rows";
{
SCOPED_TIMER(serialize_batch_timer_);
RETURN_IF_ERROR(src->Serialize(dest));
int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
}
return Status::OK();
}
int64_t KrpcDataStreamSender::GetNumDataBytesSent() const {
return bytes_sent_counter_->value();
}
} // namespace impala