blob: d14d6ea09c17329291d35ce1707f69f5e2a5ed44 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// This module is internal to the client and not a public API.
#include "kudu/master/master_rpc.h"
#include <boost/bind.hpp>
#include <mutex>
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.proxy.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/scoped_cleanup.h"
using std::shared_ptr;
using std::string;
using std::vector;
using kudu::consensus::RaftPeerPB;
using kudu::rpc::Messenger;
using kudu::rpc::Rpc;
namespace kudu {
namespace master {
////////////////////////////////////////////////////////////
// GetMasterRegistrationRpc
////////////////////////////////////////////////////////////
GetMasterRegistrationRpc::GetMasterRegistrationRpc(
StatusCallback user_cb, Sockaddr addr, const MonoTime& deadline,
const shared_ptr<Messenger>& messenger, ServerEntryPB* out)
: Rpc(deadline, messenger),
user_cb_(std::move(user_cb)),
addr_(std::move(addr)),
out_(DCHECK_NOTNULL(out)) {}
GetMasterRegistrationRpc::~GetMasterRegistrationRpc() {
}
void GetMasterRegistrationRpc::SendRpc() {
MasterServiceProxy proxy(retrier().messenger(),
addr_);
GetMasterRegistrationRequestPB req;
proxy.GetMasterRegistrationAsync(req, &resp_,
mutable_retrier()->mutable_controller(),
boost::bind(&GetMasterRegistrationRpc::SendRpcCb,
this,
Status::OK()));
}
string GetMasterRegistrationRpc::ToString() const {
return strings::Substitute("GetMasterRegistrationRpc(address: $0, num_attempts: $1)",
addr_.ToString(), num_attempts());
}
void GetMasterRegistrationRpc::SendRpcCb(const Status& status) {
gscoped_ptr<GetMasterRegistrationRpc> deleter(this);
Status new_status = status;
if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
ignore_result(deleter.release());
return;
}
if (new_status.ok() && resp_.has_error()) {
if (resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
// If CatalogManager is not initialized, treat the node as a
// FOLLOWER for the time being, as currently this RPC is only
// used for the purposes of finding the leader master.
resp_.set_role(RaftPeerPB::FOLLOWER);
new_status = Status::OK();
} else {
out_->mutable_error()->CopyFrom(resp_.error().status());
new_status = StatusFromPB(resp_.error().status());
}
}
if (new_status.ok()) {
out_->mutable_instance_id()->CopyFrom(resp_.instance_id());
out_->mutable_registration()->CopyFrom(resp_.registration());
out_->set_role(resp_.role());
}
user_cb_.Run(new_status);
}
////////////////////////////////////////////////////////////
// GetLeaderMasterRpc
////////////////////////////////////////////////////////////
GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb,
vector<Sockaddr> addrs,
MonoTime deadline,
MonoDelta rpc_timeout,
shared_ptr<Messenger> messenger)
: Rpc(std::move(deadline), std::move(messenger)),
user_cb_(std::move(user_cb)),
addrs_(std::move(addrs)),
rpc_timeout_(std::move(rpc_timeout)),
pending_responses_(0),
completed_(false) {
DCHECK(deadline.Initialized());
// Using resize instead of reserve to explicitly initialized the values.
responses_.resize(addrs_.size());
}
GetLeaderMasterRpc::~GetLeaderMasterRpc() {
}
string GetLeaderMasterRpc::ToString() const {
vector<string> sockaddr_str;
for (const Sockaddr& addr : addrs_) {
sockaddr_str.push_back(addr.ToString());
}
return strings::Substitute("GetLeaderMasterRpc(addrs: $0, num_attempts: $1)",
JoinStrings(sockaddr_str, ","),
num_attempts());
}
void GetLeaderMasterRpc::SendRpc() {
// Compute the actual deadline to use for each RPC.
MonoTime rpc_deadline = MonoTime::Now(MonoTime::FINE);
rpc_deadline.AddDelta(rpc_timeout_);
MonoTime actual_deadline = MonoTime::Earliest(retrier().deadline(),
rpc_deadline);
std::lock_guard<simple_spinlock> l(lock_);
for (int i = 0; i < addrs_.size(); i++) {
GetMasterRegistrationRpc* rpc = new GetMasterRegistrationRpc(
Bind(&GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode,
this, ConstRef(addrs_[i]), ConstRef(responses_[i])),
addrs_[i],
actual_deadline,
retrier().messenger(),
&responses_[i]);
rpc->SendRpc();
++pending_responses_;
}
}
void GetLeaderMasterRpc::SendRpcCb(const Status& status) {
// To safely retry, we must reset completed_ so that it can be reused in the
// next round of RPCs.
//
// The SendRpcCb invariant (see GetMasterRegistrationRpcCbForNode comments)
// implies that if we're to retry, we must be the last response. Thus, it is
// safe to reset completed_ in this case; there's no danger of a late
// response reading it and entering SendRpcCb inadvertently.
auto undo_completed = MakeScopedCleanup([&]() {
std::lock_guard<simple_spinlock> l(lock_);
completed_ = false;
});
// If we've received replies from all of the nodes without finding
// the leader, or if there were network errors talking to all of the
// nodes the error is retriable and we can perform a delayed retry.
if (status.IsNetworkError() || status.IsNotFound()) {
mutable_retrier()->DelayedRetry(this, status);
return;
}
// If our replies timed out but the deadline hasn't passed, retry.
if (status.IsTimedOut() &&
MonoTime::Now(MonoTime::FINE).ComesBefore(retrier().deadline())) {
mutable_retrier()->DelayedRetry(this, status);
return;
}
// We are not retrying.
undo_completed.cancel();
user_cb_.Run(status, leader_master_);
}
void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(const Sockaddr& node_addr,
const ServerEntryPB& resp,
const Status& status) {
// TODO: handle the situation where one Master is partitioned from
// the rest of the Master consensus configuration, all are reachable by the client,
// and the partitioned node "thinks" it's the leader.
//
// The proper way to do so is to add term/index to the responses
// from the Master, wait for majority of the Masters to respond, and
// pick the one with the highest term/index as the leader.
Status new_status = status;
{
std::lock_guard<simple_spinlock> lock(lock_);
if (completed_) {
// If 'user_cb_' has been invoked (see SendRpcCb above), we can
// stop.
return;
}
if (new_status.ok()) {
if (resp.role() != RaftPeerPB::LEADER) {
// Use a Status::NotFound() to indicate that the node is not
// the leader: this way, we can handle the case where we've
// received a reply from all of the nodes in the cluster (no
// network or other errors encountered), but haven't found a
// leader (which means that SendRpcCb() above can perform a
// delayed retry).
new_status = Status::NotFound("no leader found: " + ToString());
} else {
// We've found a leader.
leader_master_ = HostPort(node_addr);
completed_ = true;
}
}
--pending_responses_;
if (!new_status.ok()) {
if (pending_responses_ > 0) {
// Don't call SendRpcCb() on error unless we're the last
// outstanding response: calling SendRpcCb() will trigger
// a delayed re-try, which don't need to do unless we've
// been unable to find a leader so far.
return;
} else {
completed_ = true;
}
}
}
// Called if the leader has been determined, or if we've received
// all of the responses.
SendRpcCb(new_status);
}
} // namespace master
} // namespace kudu