blob: 68dfdfd63813ada5db690290f1644ffce1e87a4c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/internal_service.h"
#include "common/config.h"
#include "gen_cpp/BackendService.h"
#include "runtime/exec_env.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/fragment_mgr.h"
#include "runtime/load_channel_mgr.h"
#include "service/brpc.h"
#include "util/uid_util.h"
#include "util/thrift_util.h"
#include "runtime/buffer_control_block.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/routine_load/routine_load_task_executor.h"
namespace doris {
template<typename T>
PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
: _exec_env(exec_env),
_tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
}
template<typename T>
PInternalServiceImpl<T>::~PInternalServiceImpl() {
}
template<typename T>
void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cntl_base,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
<< " node=" << request->node_id();
_exec_env->stream_mgr()->transmit_data(request, &done);
if (done != nullptr) {
done->Run();
}
}
template<typename T>
void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController* controller,
const PTabletWriterOpenRequest* request,
PTabletWriterOpenResult* response,
google::protobuf::Closure* done) {
VLOG_RPC << "tablet writer open, id=" << request->id()
<< ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id();
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->load_channel_mgr()->open(*request);
if (!st.ok()) {
LOG(WARNING) << "load channel open failed, message=" << st.get_error_msg()
<< ", id=" << request->id()
<< ", index_id=" << request->index_id()
<< ", txn_id=" << request->txn_id();
}
st.to_protobuf(response->mutable_status());
}
template<typename T>
void PInternalServiceImpl<T>::exec_plan_fragment(
google::protobuf::RpcController* cntl_base,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
auto st = _exec_plan_fragment(cntl);
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
}
st.to_protobuf(response->mutable_status());
}
template<typename T>
void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController* controller,
const PTabletWriterAddBatchRequest* request,
PTabletWriterAddBatchResult* response,
google::protobuf::Closure* done) {
VLOG_RPC << "tablet writer add batch, id=" << request->id()
<< ", index_id=" << request->index_id()
<< ", sender_id=" << request->sender_id();
// add batch maybe cost a lot of time, and this callback thread will be held.
// this will influence query execution, because the pthreads under bthread may be
// exhausted, so we put this to a local thread pool to process
_tablet_worker_pool.offer(
[request, response, done, this] () {
brpc::ClosureGuard closure_guard(done);
int64_t execution_time_ns = 0;
int64_t wait_lock_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
auto st = _exec_env->load_channel_mgr()->add_batch(
*request, response->mutable_tablet_vec(), &wait_lock_time_ns);
if (!st.ok()) {
LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
<< ", id=" << request->id()
<< ", index_id=" << request->index_id()
<< ", sender_id=" << request->sender_id();
}
st.to_protobuf(response->mutable_status());
}
response->set_execution_time_us(execution_time_ns / 1000);
response->set_wait_lock_time_us(wait_lock_time_ns / 1000);
});
}
template<typename T>
void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcController* controller,
const PTabletWriterCancelRequest* request,
PTabletWriterCancelResult* response,
google::protobuf::Closure* done) {
VLOG_RPC << "tablet writer cancel, id=" << request->id()
<< ", index_id=" << request->index_id()
<< ", sender_id=" << request->sender_id();
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->load_channel_mgr()->cancel(*request);
if (!st.ok()) {
LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
<< ", index_id=" << request->index_id()
<< ", sender_id=" << request->sender_id();
}
}
template<typename T>
Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
auto ser_request = cntl->request_attachment().to_string();
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
}
LOG(INFO) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
<< ", coord=" << t_request.coord << ", backend=" << t_request.backend_num;
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
}
template<typename T>
void PInternalServiceImpl<T>::cancel_plan_fragment(
google::protobuf::RpcController* cntl_base,
const PCancelPlanFragmentRequest* request,
PCancelPlanFragmentResult* result,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
TUniqueId tid;
tid.__set_hi(request->finst_id().hi());
tid.__set_lo(request->finst_id().lo());
Status st;
if (request->has_cancel_reason()) {
LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid) << ", reason: " << request->cancel_reason();
st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
} else {
LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid);
st = _exec_env->fragment_mgr()->cancel(tid);
}
if (!st.ok()) {
LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st.get_error_msg();
}
st.to_protobuf(result->mutable_status());
}
template<typename T>
void PInternalServiceImpl<T>::fetch_data(
google::protobuf::RpcController* cntl_base,
const PFetchDataRequest* request,
PFetchDataResult* result,
google::protobuf::Closure* done) {
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
template<typename T>
void PInternalServiceImpl<T>::trigger_profile_report(
google::protobuf::RpcController* controller,
const PTriggerProfileReportRequest* request,
PTriggerProfileReportResult* result,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->fragment_mgr()->trigger_profile_report(request);
st.to_protobuf(result->mutable_status());
}
template<typename T>
void PInternalServiceImpl<T>::get_info(
google::protobuf::RpcController* controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
if (request->has_kafka_meta_request()) {
std::vector<int32_t> partition_ids;
Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta(request->kafka_meta_request(), &partition_ids);
if (st.ok()) {
PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result();
for (int32_t id : partition_ids) {
kafka_result->add_partition_ids(id);
}
}
st.to_protobuf(response->mutable_status());
return;
}
Status::OK().to_protobuf(response->mutable_status());
}
template class PInternalServiceImpl<PBackendService>;
template class PInternalServiceImpl<palo::PInternalService>;
}