blob: 7ff3c3c65fe46774926265b881ca29637fd880c0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/runtime/vdata_stream_mgr.h"
#include <gen_cpp/Types_types.h>
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <stddef.h>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include "common/logging.h"
#include "util/hash_util.hpp"
#include "vec/runtime/vdata_stream_recvr.h"
namespace doris {
#include "common/compile_check_begin.h"
namespace vectorized {
VDataStreamMgr::VDataStreamMgr() {
// TODO: metric
}
VDataStreamMgr::~VDataStreamMgr() {
// Has to call close here, because receiver will check if the receiver is closed.
// It will core during graceful stop.
auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
{
std::shared_lock l(_lock);
auto receiver_iterator = _receiver_map.begin();
while (receiver_iterator != _receiver_map.end()) {
// Could not call close directly, because during close method, it will remove itself
// from the map, and modify the map, it will core.
receivers.push_back(receiver_iterator->second);
receiver_iterator++;
}
}
for (auto iter = receivers.begin(); iter != receivers.end(); ++iter) {
(*iter)->close();
}
}
inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instance_id,
PlanNodeId node_id) {
uint32_t value = HashUtil::hash(&fragment_instance_id.lo, 8, 0);
value = HashUtil::hash(&fragment_instance_id.hi, 8, value);
value = HashUtil::hash(&node_id, 4, value);
return value;
}
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
DCHECK(profile != nullptr);
VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id)
<< ", node=" << dest_node_id;
std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders,
is_merging, profile, data_queue_capacity));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::unique_lock l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
_receiver_map.insert(std::make_pair(hash_value, recvr));
return recvr;
}
Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id,
std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) {
VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id)
<< ", node=" << node_id;
uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
// Create lock guard and not own lock currently and will lock conditionally
std::shared_lock recvr_lock(_lock, std::defer_lock);
if (acquire_lock) {
recvr_lock.lock();
}
std::pair<StreamMap::iterator, StreamMap::iterator> range =
_receiver_map.equal_range(hash_value);
while (range.first != range.second) {
auto recvr = range.first->second;
if (recvr->fragment_instance_id() == fragment_instance_id &&
recvr->dest_node_id() == node_id) {
*res = recvr;
return Status::OK();
}
++range.first;
}
return Status::InvalidArgument("Could not find local receiver for node {} with instance {}",
node_id, print_id(fragment_instance_id));
}
Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
::google::protobuf::Closure** done,
const int64_t wait_for_worker) {
const PUniqueId& finst_id = request->finst_id();
TUniqueId t_finst_id;
t_finst_id.hi = finst_id.hi();
t_finst_id.lo = finst_id.lo();
std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
if (recvr == nullptr) {
// The receiver may remove itself from the receiver map via deregister_recvr()
// at any time without considering the remaining number of senders.
// As a consequence, find_recvr() may return an innocuous NULL if a thread
// calling deregister_recvr() beat the thread calling find_recvr()
// in acquiring _lock.
//
// e.g. for broadcast join build side, only one instance will build the hash table,
// all other instances don't need build side data and will close the data stream receiver.
//
// TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
// errors from receiver-initiated teardowns.
return Status::EndOfFile("data stream receiver closed");
}
// Lock the fragment context to ensure the runtime state and other objects are not
// deconstructed
auto ctx_lock = recvr->task_exec_ctx();
if (ctx_lock == nullptr) {
// Do not return internal error, because when query finished, the downstream node
// may finish before upstream node. And the object maybe deconstructed. If return error
// then the upstream node may report error status to FE, the query is failed.
return Status::EndOfFile("data stream receiver is deconstructed");
}
bool eos = request->eos();
Status exec_status =
request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
auto sender_id = request->sender_id();
auto be_number = request->be_number();
if (!request->blocks().empty()) {
RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker,
cpu_time_stop_watch.elapsed_time()));
} else if (request->has_block()) {
// old logic, for compatibility
std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
pblock_ptr->CopyFrom(request->block());
RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number,
request->packet_seq(), eos ? nullptr : done,
wait_for_worker, cpu_time_stop_watch.elapsed_time()));
}
if (eos) {
recvr->remove_sender(sender_id, be_number, exec_status);
}
return Status::OK();
}
Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
std::shared_ptr<VDataStreamRecvr> targert_recvr;
VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id)
<< ", node=" << node_id;
uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
{
std::unique_lock l(_lock);
auto range = _receiver_map.equal_range(hash_value);
while (range.first != range.second) {
const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second;
if (recvr->fragment_instance_id() == fragment_instance_id &&
recvr->dest_node_id() == node_id) {
targert_recvr = recvr;
_fragment_stream_set.erase(
std::make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()));
_receiver_map.erase(range.first);
break;
}
++range.first;
}
}
// Notify concurrent add_data() requests that the stream has been terminated.
// cancel_stream maybe take a long time, so we handle it out of lock.
if (targert_recvr) {
targert_recvr->cancel_stream(Status::OK());
return Status::OK();
} else {
return Status::InternalError("unknown row receiver id: fragment_instance_id={}, node_id={}",
print_id(fragment_instance_id), node_id);
}
}
void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) {
VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id);
std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
{
std::shared_lock l(_lock);
FragmentStreamSet::iterator i =
_fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) {
std::shared_ptr<VDataStreamRecvr> recvr;
WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
if (recvr == nullptr) {
// keep going but at least log it
std::stringstream err;
err << "cancel(): missing in stream_map: fragment=" << print_id(i->first)
<< " node=" << i->second;
LOG(ERROR) << err.str();
} else {
recvrs.push_back(recvr);
}
++i;
}
}
// cancel_stream maybe take a long time, so we handle it out of lock.
for (auto& it : recvrs) {
it->cancel_stream(exec_status);
}
}
} // namespace vectorized
} // namespace doris