| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/consensus/consensus.proxy.h" |
| |
| #include <algorithm> |
| #include <boost/bind.hpp> |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <mutex> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus_peers.h" |
| #include "kudu/consensus/consensus_queue.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/threadpool.h" |
| |
| DEFINE_int32(consensus_rpc_timeout_ms, 1000, |
| "Timeout used for all consensus internal RPC communications."); |
| TAG_FLAG(consensus_rpc_timeout_ms, advanced); |
| |
| DEFINE_int32(raft_get_node_instance_timeout_ms, 30000, |
| "Timeout for retrieving node instance data over RPC."); |
| TAG_FLAG(consensus_rpc_timeout_ms, hidden); |
| |
| DECLARE_int32(raft_heartbeat_interval_ms); |
| |
| DEFINE_double(fault_crash_on_leader_request_fraction, 0.0, |
| "Fraction of the time when the leader will crash just before sending an " |
| "UpdateConsensus RPC. (For testing only!)"); |
| |
| DEFINE_double(fault_crash_after_leader_request_fraction, 0.0, |
| "Fraction of the time when the leader will crash on getting a response for an " |
| "UpdateConsensus RPC. (For testing only!)"); |
| |
| TAG_FLAG(fault_crash_on_leader_request_fraction, unsafe); |
| |
| |
| // Allow for disabling Tablet Copy in unit tests where we want to test |
| // certain scenarios without triggering bootstrap of a remote peer. |
| DEFINE_bool(enable_tablet_copy, true, |
| "Whether Tablet Copy will be initiated by the leader when it " |
| "detects that a follower is out of date or does not have a tablet " |
| "replica. For testing purposes only."); |
| TAG_FLAG(enable_tablet_copy, unsafe); |
| |
| namespace kudu { |
| namespace consensus { |
| |
| using log::Log; |
| using std::shared_ptr; |
| using rpc::Messenger; |
| using rpc::RpcController; |
| using strings::Substitute; |
| using tserver::TabletServerErrorPB; |
| |
| Status Peer::NewRemotePeer(const RaftPeerPB& peer_pb, |
| const string& tablet_id, |
| const string& leader_uuid, |
| PeerMessageQueue* queue, |
| ThreadPool* thread_pool, |
| gscoped_ptr<PeerProxy> proxy, |
| gscoped_ptr<Peer>* peer) { |
| |
| gscoped_ptr<Peer> new_peer(new Peer(peer_pb, |
| tablet_id, |
| leader_uuid, |
| std::move(proxy), |
| queue, |
| thread_pool)); |
| RETURN_NOT_OK(new_peer->Init()); |
| peer->reset(new_peer.release()); |
| return Status::OK(); |
| } |
| |
| Peer::Peer(const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, |
| gscoped_ptr<PeerProxy> proxy, PeerMessageQueue* queue, |
| ThreadPool* thread_pool) |
| : tablet_id_(std::move(tablet_id)), |
| leader_uuid_(std::move(leader_uuid)), |
| peer_pb_(peer_pb), |
| proxy_(std::move(proxy)), |
| queue_(queue), |
| failed_attempts_(0), |
| sem_(1), |
| heartbeater_( |
| peer_pb.permanent_uuid(), |
| MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms), |
| boost::bind(&Peer::SignalRequest, this, true)), |
| thread_pool_(thread_pool), |
| state_(kPeerCreated) {} |
| |
| void Peer::SetTermForTest(int term) { |
| response_.set_responder_term(term); |
| } |
| |
| Status Peer::Init() { |
| std::lock_guard<simple_spinlock> lock(peer_lock_); |
| queue_->TrackPeer(peer_pb_.permanent_uuid()); |
| RETURN_NOT_OK(heartbeater_.Start()); |
| state_ = kPeerStarted; |
| return Status::OK(); |
| } |
| |
| Status Peer::SignalRequest(bool even_if_queue_empty) { |
| // If the peer is currently sending, return Status::OK(). |
| // If there are new requests in the queue we'll get them on ProcessResponse(). |
| if (!sem_.TryAcquire()) { |
| return Status::OK(); |
| } |
| { |
| std::lock_guard<simple_spinlock> l(peer_lock_); |
| |
| if (PREDICT_FALSE(state_ == kPeerClosed)) { |
| sem_.Release(); |
| return Status::IllegalState("Peer was closed."); |
| } |
| |
| // For the first request sent by the peer, we send it even if the queue is empty, |
| // which it will always appear to be for the first request, since this is the |
| // negotiation round. |
| if (PREDICT_FALSE(state_ == kPeerStarted)) { |
| even_if_queue_empty = true; |
| state_ = kPeerRunning; |
| } |
| DCHECK_EQ(state_, kPeerRunning); |
| |
| // If our last request generated an error, and this is not a normal |
| // heartbeat request, then don't send the "per-RPC" request. Instead, |
| // we'll wait for the heartbeat. |
| // |
| // TODO: we could consider looking at the number of consecutive failed |
| // attempts, and instead of ignoring the signal, ask the heartbeater |
| // to "expedite" the next heartbeat in order to achieve something like |
| // exponential backoff after an error. As it is implemented today, any |
| // transient error will result in a latency blip as long as the heartbeat |
| // period. |
| if (failed_attempts_ > 0 && !even_if_queue_empty) { |
| sem_.Release(); |
| return Status::OK(); |
| } |
| } |
| |
| |
| RETURN_NOT_OK(thread_pool_->SubmitClosure( |
| Bind(&Peer::SendNextRequest, Unretained(this), even_if_queue_empty))); |
| return Status::OK(); |
| } |
| |
| void Peer::SendNextRequest(bool even_if_queue_empty) { |
| // The peer has no pending request nor is sending: send the request. |
| bool needs_tablet_copy = false; |
| int64_t commit_index_before = request_.has_committed_index() ? |
| request_.committed_index().index() : kMinimumOpIdIndex; |
| Status s = queue_->RequestForPeer(peer_pb_.permanent_uuid(), &request_, |
| &replicate_msg_refs_, &needs_tablet_copy); |
| int64_t commit_index_after = request_.has_committed_index() ? |
| request_.committed_index().index() : kMinimumOpIdIndex; |
| |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Could not obtain request from queue for peer: " |
| << peer_pb_.permanent_uuid() << ". Status: " << s.ToString(); |
| sem_.Release(); |
| return; |
| } |
| |
| if (PREDICT_FALSE(needs_tablet_copy)) { |
| Status s = SendTabletCopyRequest(); |
| if (!s.ok()) { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to generate Tablet Copy request for peer: " |
| << s.ToString(); |
| sem_.Release(); |
| } |
| return; |
| } |
| |
| request_.set_tablet_id(tablet_id_); |
| request_.set_caller_uuid(leader_uuid_); |
| request_.set_dest_uuid(peer_pb_.permanent_uuid()); |
| |
| bool req_has_ops = request_.ops_size() > 0 || (commit_index_after > commit_index_before); |
| // If the queue is empty, check if we were told to send a status-only |
| // message, if not just return. |
| if (PREDICT_FALSE(!req_has_ops && !even_if_queue_empty)) { |
| sem_.Release(); |
| return; |
| } |
| |
| // If we're actually sending ops there's no need to heartbeat for a while, |
| // reset the heartbeater |
| if (req_has_ops) { |
| heartbeater_.Reset(); |
| } |
| |
| MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction); |
| |
| |
| VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending to peer " << peer_pb().permanent_uuid() << ": " |
| << request_.ShortDebugString(); |
| controller_.Reset(); |
| |
| proxy_->UpdateAsync(&request_, &response_, &controller_, |
| boost::bind(&Peer::ProcessResponse, this)); |
| } |
| |
| void Peer::ProcessResponse() { |
| // Note: This method runs on the reactor thread. |
| |
| DCHECK_EQ(0, sem_.GetValue()) |
| << "Got a response when nothing was pending"; |
| |
| MAYBE_FAULT(FLAGS_fault_crash_after_leader_request_fraction); |
| |
| if (!controller_.status().ok()) { |
| if (controller_.status().IsRemoteError()) { |
| // Most controller errors are caused by network issues or corner cases |
| // like shutdown and failure to serialize a protobuf. Therefore, we |
| // generally consider these errors to indicate an unreachable peer. |
| // However, a RemoteError wraps some other error propagated from the |
| // remote peer, so we know the remote is alive. Therefore, we will let |
| // the queue know that the remote is responsive. |
| queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid()); |
| } |
| ProcessResponseError(controller_.status()); |
| return; |
| } |
| |
| // Pass through errors we can respond to, like not found, since in that case |
| // we will need to start a Tablet Copy. TODO: Handle DELETED response once implemented. |
| if ((response_.has_error() && |
| response_.error().code() != TabletServerErrorPB::TABLET_NOT_FOUND) || |
| (response_.status().has_error() && |
| response_.status().error().code() == consensus::ConsensusErrorPB::CANNOT_PREPARE)) { |
| // Again, let the queue know that the remote is still responsive, since we |
| // will not be sending this error response through to the queue. |
| queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid()); |
| ProcessResponseError(StatusFromPB(response_.error().status())); |
| return; |
| } |
| |
| // The queue's handling of the peer response may generate IO (reads against |
| // the WAL) and SendNextRequest() may do the same thing. So we run the rest |
| // of the response handling logic on our thread pool and not on the reactor |
| // thread. |
| Status s = thread_pool_->SubmitClosure(Bind(&Peer::DoProcessResponse, Unretained(this))); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to process peer response: " << s.ToString() |
| << ": " << response_.ShortDebugString(); |
| sem_.Release(); |
| } |
| } |
| |
| void Peer::DoProcessResponse() { |
| failed_attempts_ = 0; |
| |
| VLOG_WITH_PREFIX_UNLOCKED(2) << "Response from peer " << peer_pb().permanent_uuid() << ": " |
| << response_.ShortDebugString(); |
| |
| bool more_pending; |
| queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_, &more_pending); |
| |
| // We're OK to read the state_ without a lock here -- if we get a race, |
| // the worst thing that could happen is that we'll make one more request before |
| // noticing a close. |
| if (more_pending && ANNOTATE_UNPROTECTED_READ(state_) != kPeerClosed) { |
| SendNextRequest(true); |
| } else { |
| sem_.Release(); |
| } |
| } |
| |
| Status Peer::SendTabletCopyRequest() { |
| if (!FLAGS_enable_tablet_copy) { |
| failed_attempts_++; |
| return Status::NotSupported("Tablet Copy is disabled"); |
| } |
| |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Sending request to start Tablet Copy"; |
| RETURN_NOT_OK(queue_->GetTabletCopyRequestForPeer(peer_pb_.permanent_uuid(), &tc_request_)); |
| controller_.Reset(); |
| proxy_->StartTabletCopy(&tc_request_, &tc_response_, &controller_, |
| boost::bind(&Peer::ProcessTabletCopyResponse, this)); |
| return Status::OK(); |
| } |
| |
| void Peer::ProcessTabletCopyResponse() { |
| if (controller_.status().ok() && tc_response_.has_error()) { |
| // ALREADY_INPROGRESS is expected, so we do not log this error. |
| if (tc_response_.error().code() == |
| TabletServerErrorPB::TabletServerErrorPB::ALREADY_INPROGRESS) { |
| queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid()); |
| } else { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to begin Tablet Copy on peer: " |
| << tc_response_.ShortDebugString(); |
| } |
| } |
| sem_.Release(); |
| } |
| |
| void Peer::ProcessResponseError(const Status& status) { |
| failed_attempts_++; |
| string resp_err_info; |
| if (response_.has_error()) { |
| resp_err_info = Substitute(" Error code: $0 ($1).", |
| TabletServerErrorPB::Code_Name(response_.error().code()), |
| response_.error().code()); |
| } |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Couldn't send request to peer " << peer_pb_.permanent_uuid() |
| << " for tablet " << tablet_id_ << "." |
| << resp_err_info |
| << " Status: " << status.ToString() << "." |
| << " Retrying in the next heartbeat period." |
| << " Already tried " << failed_attempts_ << " times."; |
| sem_.Release(); |
| } |
| |
| string Peer::LogPrefixUnlocked() const { |
| return Substitute("T $0 P $1 -> Peer $2 ($3:$4): ", |
| tablet_id_, leader_uuid_, peer_pb_.permanent_uuid(), |
| peer_pb_.last_known_addr().host(), peer_pb_.last_known_addr().port()); |
| } |
| |
| void Peer::Close() { |
| WARN_NOT_OK(heartbeater_.Stop(), "Could not stop heartbeater"); |
| |
| // If the peer is already closed return. |
| { |
| std::lock_guard<simple_spinlock> lock(peer_lock_); |
| if (state_ == kPeerClosed) return; |
| DCHECK(state_ == kPeerRunning || state_ == kPeerStarted) << "Unexpected state: " << state_; |
| state_ = kPeerClosed; |
| } |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Closing peer: " << peer_pb_.permanent_uuid(); |
| |
| // Acquire the semaphore to wait for any concurrent request to finish. |
| // They will see the state_ == kPeerClosed and not start any new requests, |
| // but we can't currently cancel the already-sent ones. (see KUDU-699) |
| std::lock_guard<Semaphore> l(sem_); |
| queue_->UntrackPeer(peer_pb_.permanent_uuid()); |
| // We don't own the ops (the queue does). |
| request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr); |
| } |
| |
| Peer::~Peer() { |
| Close(); |
| } |
| |
| |
| RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport, |
| gscoped_ptr<ConsensusServiceProxy> consensus_proxy) |
| : hostport_(std::move(hostport)), |
| consensus_proxy_(std::move(consensus_proxy)) { |
| } |
| |
| void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request, |
| ConsensusResponsePB* response, |
| rpc::RpcController* controller, |
| const rpc::ResponseCallback& callback) { |
| controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms)); |
| consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback); |
| } |
| |
| void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request, |
| VoteResponsePB* response, |
| rpc::RpcController* controller, |
| const rpc::ResponseCallback& callback) { |
| consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback); |
| } |
| |
| void RpcPeerProxy::StartTabletCopy(const StartTabletCopyRequestPB* request, |
| StartTabletCopyResponsePB* response, |
| rpc::RpcController* controller, |
| const rpc::ResponseCallback& callback) { |
| consensus_proxy_->StartTabletCopyAsync(*request, response, controller, callback); |
| } |
| |
| RpcPeerProxy::~RpcPeerProxy() {} |
| |
| namespace { |
| |
| Status CreateConsensusServiceProxyForHost(const shared_ptr<Messenger>& messenger, |
| const HostPort& hostport, |
| gscoped_ptr<ConsensusServiceProxy>* new_proxy) { |
| vector<Sockaddr> addrs; |
| RETURN_NOT_OK(hostport.ResolveAddresses(&addrs)); |
| if (addrs.size() > 1) { |
| LOG(WARNING)<< "Peer address '" << hostport.ToString() << "' " |
| << "resolves to " << addrs.size() << " different addresses. Using " |
| << addrs[0].ToString(); |
| } |
| new_proxy->reset(new ConsensusServiceProxy(messenger, addrs[0])); |
| return Status::OK(); |
| } |
| |
| } // anonymous namespace |
| |
| RpcPeerProxyFactory::RpcPeerProxyFactory(shared_ptr<Messenger> messenger) |
| : messenger_(std::move(messenger)) {} |
| |
| Status RpcPeerProxyFactory::NewProxy(const RaftPeerPB& peer_pb, |
| gscoped_ptr<PeerProxy>* proxy) { |
| gscoped_ptr<HostPort> hostport(new HostPort); |
| RETURN_NOT_OK(HostPortFromPB(peer_pb.last_known_addr(), hostport.get())); |
| gscoped_ptr<ConsensusServiceProxy> new_proxy; |
| RETURN_NOT_OK(CreateConsensusServiceProxyForHost(messenger_, *hostport, &new_proxy)); |
| proxy->reset(new RpcPeerProxy(std::move(hostport), std::move(new_proxy))); |
| return Status::OK(); |
| } |
| |
| RpcPeerProxyFactory::~RpcPeerProxyFactory() {} |
| |
| Status SetPermanentUuidForRemotePeer(const shared_ptr<Messenger>& messenger, |
| RaftPeerPB* remote_peer) { |
| DCHECK(!remote_peer->has_permanent_uuid()); |
| HostPort hostport; |
| RETURN_NOT_OK(HostPortFromPB(remote_peer->last_known_addr(), &hostport)); |
| gscoped_ptr<ConsensusServiceProxy> proxy; |
| RETURN_NOT_OK(CreateConsensusServiceProxyForHost(messenger, hostport, &proxy)); |
| GetNodeInstanceRequestPB req; |
| GetNodeInstanceResponsePB resp; |
| rpc::RpcController controller; |
| |
| // TODO generalize this exponential backoff algorithm, as we do the |
| // same thing in catalog_manager.cc |
| // (AsyncTabletRequestTask::RpcCallBack). |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(MonoDelta::FromMilliseconds(FLAGS_raft_get_node_instance_timeout_ms)); |
| int attempt = 1; |
| while (true) { |
| VLOG(2) << "Getting uuid from remote peer. Request: " << req.ShortDebugString(); |
| |
| controller.Reset(); |
| Status s = proxy->GetNodeInstance(req, &resp, &controller); |
| if (s.ok()) { |
| if (controller.status().ok()) { |
| break; |
| } |
| s = controller.status(); |
| } |
| |
| LOG(WARNING) << "Error getting permanent uuid from config peer " << hostport.ToString() << ": " |
| << s.ToString(); |
| MonoTime now = MonoTime::Now(MonoTime::FINE); |
| if (now.ComesBefore(deadline)) { |
| int64_t remaining_ms = deadline.GetDeltaSince(now).ToMilliseconds(); |
| int64_t base_delay_ms = 1 << (attempt + 3); // 1st retry delayed 2^4 ms, 2nd 2^5, etc.. |
| int64_t jitter_ms = rand() % 50; // Add up to 50ms of additional random delay. |
| int64_t delay_ms = std::min<int64_t>(base_delay_ms + jitter_ms, remaining_ms); |
| VLOG(1) << "Sleeping " << delay_ms << " ms. before retrying to get uuid from remote peer..."; |
| SleepFor(MonoDelta::FromMilliseconds(delay_ms)); |
| LOG(INFO) << "Retrying to get permanent uuid for remote peer: " |
| << remote_peer->ShortDebugString() << " attempt: " << attempt++; |
| } else { |
| s = Status::TimedOut(Substitute("Getting permanent uuid from $0 timed out after $1 ms.", |
| hostport.ToString(), |
| FLAGS_raft_get_node_instance_timeout_ms), |
| s.ToString()); |
| return s; |
| } |
| } |
| remote_peer->set_permanent_uuid(resp.node_instance().permanent_uuid()); |
| return Status::OK(); |
| } |
| |
| } // namespace consensus |
| } // namespace kudu |