blob: 9d4fdeb110a6f729a45a345d20555ac3e6882908 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/request_tracker.h"
#include "kudu/rpc/rpc.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/messenger.h"
#include "kudu/util/monotime.h"
namespace kudu {
namespace rpc {
namespace internal {
typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
}
// A base class for retriable RPCs that handles replica picking and retry logic.
//
// The 'Server' template parameter refers to the the type of the server that will be looked up
// and passed to the derived classes on Try(). For instance in the case of WriteRpc it's
// RemoteTabletServer.
//
// TODO merge RpcRetrier into this class? Can't be done right now as the retrier is used
// independently elsewhere, but likely possible when all replicated RPCs have a ReplicaPicker.
//
// TODO allow to target replicas other than the leader, if needed.
//
// TOOD once we have retry handling on all the RPCs merge this with rpc::Rpc.
template <class Server, class RequestPB, class ResponsePB>
class RetriableRpc : public Rpc {
public:
RetriableRpc(const scoped_refptr<ServerPicker<Server>>& server_picker,
const scoped_refptr<RequestTracker>& request_tracker,
const MonoTime& deadline,
const std::shared_ptr<Messenger>& messenger)
: Rpc(deadline, messenger),
server_picker_(server_picker),
request_tracker_(request_tracker),
sequence_number_(RequestTracker::NO_SEQ_NO),
num_attempts_(0) {}
virtual ~RetriableRpc() {
DCHECK_EQ(sequence_number_, RequestTracker::NO_SEQ_NO);
}
// Performs server lookup/initialization.
// If/when the server is looked up and initialized successfully RetriableRpc will call
// Try() to actually send the request.
void SendRpc() override;
protected:
// Subclasses implement this method to actually try the RPC.
// The server been looked up and is ready to be used.
virtual void Try(Server* replica, const ResponseCallback& callback) = 0;
// Subclasses implement this method to analyze 'status', the controller status or
// the response and return a RetriableRpcStatus which will then be used
// to decide how to proceed (retry or give up).
virtual RetriableRpcStatus AnalyzeResponse(const Status& status) = 0;
// Subclasses implement this method to perform cleanup and/or final steps.
// After this is called the RPC will be no longer retried.
virtual void Finish(const Status& status) = 0;
// Request body.
RequestPB req_;
// Response body.
ResponsePB resp_;
private:
friend class CalculatorServiceRpc;
// Decides whether to retry the RPC, based on the result of AnalyzeResponse() and retries
// if that is the case.
// Returns true if the RPC was retried or false otherwise.
bool RetryIfNeeded(const RetriableRpcStatus& result, Server* server);
// Called when the replica has been looked up.
void ReplicaFoundCb(const Status& status, Server* server);
// Called when after the RPC was performed.
void SendRpcCb(const Status& status) override;
// Performs final cleanup, after the RPC is done (independently of success).
void FinishInternal();
scoped_refptr<ServerPicker<Server>> server_picker_;
scoped_refptr<RequestTracker> request_tracker_;
const MonoTime deadline_;
std::shared_ptr<Messenger> messenger_;
// The sequence number for this RPC.
internal::SequenceNumber sequence_number_;
// The number of times this RPC has been attempted
int32 num_attempts_;
// Keeps track of the replica the RPCs were sent to.
// TODO Remove this and pass the used replica around. For now we need to keep this as
// the retrier calls the SendRpcCb directly and doesn't know the replica that was
// being written to.
Server* current_;
};
template <class Server, class RequestPB, class ResponsePB>
void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpc() {
if (sequence_number_ == RequestTracker::NO_SEQ_NO) {
CHECK_OK(request_tracker_->NewSeqNo(&sequence_number_));
}
server_picker_->PickLeader(Bind(&RetriableRpc::ReplicaFoundCb,
Unretained(this)),
retrier().deadline());
}
template <class Server, class RequestPB, class ResponsePB>
bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(const RetriableRpcStatus& result,
Server* server) {
// Handle the cases where we retry.
switch (result.result) {
// For writes, always retry a TOO_BUSY error on the same server.
case RetriableRpcStatus::SERVER_BUSY: {
break;
}
case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: {
VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString();
server_picker_->MarkServerFailed(server, result.status);
break;
}
// The TabletServer was not part of the config serving the tablet.
// We mark our tablet cache as stale, forcing a master lookup on the next attempt.
// TODO: Don't backoff the first time we hit this error (see KUDU-1314).
case RetriableRpcStatus::RESOURCE_NOT_FOUND: {
server_picker_->MarkResourceNotFound(server);
break;
}
// The TabletServer was not the leader of the quorum.
case RetriableRpcStatus::REPLICA_NOT_LEADER: {
server_picker_->MarkReplicaNotLeader(server);
break;
}
// For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry.
default:
return false;
}
resp_.Clear();
current_ = nullptr;
mutable_retrier()->DelayedRetry(this, result.status);
return true;
}
template <class Server, class RequestPB, class ResponsePB>
void RetriableRpc<Server, RequestPB, ResponsePB>::FinishInternal() {
// Mark the RPC as completed and set the sequence number to NO_SEQ_NO to make
// sure we're in the appropriate state before destruction.
request_tracker_->RpcCompleted(sequence_number_);
sequence_number_ = RequestTracker::NO_SEQ_NO;
}
template <class Server, class RequestPB, class ResponsePB>
void RetriableRpc<Server, RequestPB, ResponsePB>::ReplicaFoundCb(const Status& status,
Server* server) {
RetriableRpcStatus result = AnalyzeResponse(status);
if (RetryIfNeeded(result, server)) return;
if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) {
FinishInternal();
Finish(result.status);
return;
}
// We successfully found a replica, so prepare the RequestIdPB before we send out the call.
std::unique_ptr<RequestIdPB> request_id(new RequestIdPB());
request_id->set_client_id(request_tracker_->client_id());
request_id->set_seq_no(sequence_number_);
request_id->set_first_incomplete_seq_no(request_tracker_->FirstIncomplete());
request_id->set_attempt_no(num_attempts_++);
mutable_retrier()->mutable_controller()->SetRequestIdPB(std::move(request_id));
DCHECK_EQ(result.result, RetriableRpcStatus::OK);
current_ = server;
Try(server, boost::bind(&RetriableRpc::SendRpcCb, this, Status::OK()));
}
template <class Server, class RequestPB, class ResponsePB>
void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpcCb(const Status& status) {
RetriableRpcStatus result = AnalyzeResponse(status);
if (RetryIfNeeded(result, current_)) return;
FinishInternal();
// From here on out the rpc has either succeeded of suffered a non-retriable
// failure.
Status final_status = result.status;
if (!final_status.ok()) {
string error_string;
if (current_) {
error_string = strings::Substitute("Failed to write to server: $0", current_->ToString());
} else {
error_string = "Failed to write to server: (no server available)";
}
final_status = final_status.CloneAndPrepend(error_string);
}
Finish(final_status);
}
} // namespace rpc
} // namespace kudu