blob: c1be06c691d65a7b4352c6fc1f8505951a7aee82 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <google/protobuf/stubs/common.h>
#include <atomic>
#include <utility>
#include "runtime/query_context.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"
namespace doris {
template <typename Response>
class DummyBrpcCallback {
ENABLE_FACTORY_CREATOR(DummyBrpcCallback);
public:
using ResponseType = Response;
DummyBrpcCallback() {
cntl_ = std::make_shared<brpc::Controller>();
response_ = std::make_shared<Response>();
}
virtual ~DummyBrpcCallback() = default;
virtual void call() {}
virtual void join() { brpc::Join(cntl_->call_id()); }
// controller has to be the same lifecycle with the closure, because brpc may use
// it in any stage of the rpc.
std::shared_ptr<brpc::Controller> cntl_;
// We do not know if brpc will use request or response after brpc method returns.
// So that we need keep a shared ptr here to ensure that brpc could use req/rep
// at any stage.
std::shared_ptr<Response> response_;
};
// The closure will be deleted after callback.
// It could only be created by using shared ptr or unique ptr.
// It will hold a weak ptr of T and call run of T
// Callback() {
// xxxx;
// public
// void run() {
// logxxx
// }
// }
//
// std::shared_ptr<Callback> b;
//
// std::unique_ptr<AutoReleaseClosure> a(b);
// brpc_call(a.release());
template <typename T>
concept HasStatus = requires(T* response) { response->status(); };
template <typename Request, typename Callback>
class AutoReleaseClosure : public google::protobuf::Closure {
using Weak = typename std::shared_ptr<Callback>::weak_type;
using ResponseType = typename Callback::ResponseType;
ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
public:
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback,
std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {})
: request_(req), callback_(callback), context_(std::move(context)) {
this->cntl_ = callback->cntl_;
this->response_ = callback->response_;
}
~AutoReleaseClosure() override = default;
// Will delete itself
void Run() override {
Defer defer {[&]() { delete this; }};
// If lock failed, it means the callback object is deconstructed, then no need
// to deal with the callback any more.
if (auto tmp = callback_.lock()) {
tmp->call();
}
if (cntl_->Failed()) {
_process_if_rpc_failed();
} else {
_process_status<ResponseType>(response_.get());
}
}
// controller has to be the same lifecycle with the closure, because brpc may use
// it in any stage of the rpc.
std::shared_ptr<brpc::Controller> cntl_;
// We do not know if brpc will use request or response after brpc method returns.
// So that we need keep a shared ptr here to ensure that brpc could use req/rep
// at any stage.
std::shared_ptr<Request> request_;
std::shared_ptr<ResponseType> response_;
std::string error_msg_;
protected:
virtual void _process_if_rpc_failed() {
std::string error_msg =
fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_);
if (auto ctx = context_.lock(); ctx) {
ctx->cancel(Status::NetworkError(error_msg));
} else {
LOG(WARNING) << error_msg;
}
}
virtual void _process_if_meet_error_status(const Status& status) {
if (status.is<ErrorCode::END_OF_FILE>()) {
// no need to log END_OF_FILE, reduce the unlessful log
return;
}
if (auto ctx = context_.lock(); ctx) {
ctx->cancel(status);
} else {
LOG(WARNING) << "RPC meet error status: " << status;
}
}
private:
template <typename Response>
void _process_status(Response* response) {}
template <HasStatus Response>
void _process_status(Response* response) {
if (Status status = Status::create(response->status()); !status.ok()) {
_process_if_meet_error_status(status);
}
}
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main
// thread is freed.
Weak callback_;
std::weak_ptr<QueryContext> context_;
};
} // namespace doris