blob: 747c5b50ece6a52191d7c67d8299fd7ad89a6409 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
namespace rpc {
class Messenger;
class Rpc;
// Exponential backoff with jitter anchored between 10ms and 20ms, and an upper
// bound between 2.5s and 5s.
MonoDelta ComputeExponentialBackoff(int num_attempts);
// Result status of a retriable Rpc.
//
// TODO(todd): Consider merging this with ScanRpcStatus.
struct RetriableRpcStatus {
enum Result {
// There was no error, i.e. the Rpc was successful.
OK,
// The Rpc got an error and it's not retriable.
NON_RETRIABLE_ERROR,
// The server couldn't be reached, i.e. there was a network error while
// reaching the replica or a DNS resolution problem.
SERVER_NOT_ACCESSIBLE,
// The server received the request but it was not ready to serve it right
// away. It might happen that the server was too busy and did not have
// necessary resources or information to serve the request but it
// anticipates it should be ready to serve the request really soon, so it's
// worth retrying the request at a later time.
SERVICE_UNAVAILABLE,
// For rpc's that are meant only for the leader of a shared resource, when the server
// we're interacting with is not the leader.
REPLICA_NOT_LEADER,
// The server doesn't know the resource we're interacting with. For instance a TabletServer
// is not part of the config for the tablet we're trying to write to.
RESOURCE_NOT_FOUND,
// The authentication token supplied with the operation was found invalid
// by the server. The token has likely expired. If so, get a new token
// using client credentials and retry the operation with it.
INVALID_AUTHENTICATION_TOKEN,
// Similar to INVALID_AUTHENTICATION_TOKEN, but for authorization tokens.
INVALID_AUTHORIZATION_TOKEN,
};
Result result;
Status status;
};
// This class picks a server among a possible set of servers serving a given resource.
//
// TODO Currently this only picks the leader, though it wouldn't be unfeasible to have this
// have an enum so that it can pick any server.
template <class Server>
class ServerPicker : public RefCountedThreadSafe<ServerPicker<Server>> {
public:
virtual ~ServerPicker() {}
typedef std::function<void(const Status&, Server*)> ServerPickedCallback;
// Picks the leader among the replicas serving a resource.
// If the leader was found, it calls the callback with Status::OK() and
// with 'server' set to the current leader, otherwise calls the callback
// with 'status' set to the failure reason, and 'server' set to nullptr.
// If picking a leader takes longer than 'deadline' the callback is called with
// Status::TimedOut().
virtual void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) = 0;
// Marks a server as failed/unacessible.
virtual void MarkServerFailed(Server *server, const Status &status) = 0;
// Marks a server as not the leader of config serving the resource we're trying to interact with.
virtual void MarkReplicaNotLeader(Server* replica) = 0;
// Marks a server as not serving the resource we want.
virtual void MarkResourceNotFound(Server *replica) = 0;
};
// Backoff strategy to use when retrying RPCs.
enum class BackoffType {
// Backoff a small amount of jitter before retrying, roughly linear with the
// number of RPC attempts.
LINEAR,
// Backoff by a bounded amount of time that is otherwise exponential with the
// number of RPC attempts.
EXPONENTIAL,
};
// Provides utilities for retrying failed RPCs. The default implementation adds
// a small amount of jitter before retrying, roughly linear with the number of
// RPC attempts.
class RpcRetrier {
public:
RpcRetrier(MonoTime deadline, std::shared_ptr<rpc::Messenger> messenger,
BackoffType backoff)
: attempt_num_(1),
deadline_(deadline),
messenger_(std::move(messenger)),
backoff_(backoff) {
if (deadline_.Initialized()) {
controller_.set_deadline(deadline_);
}
controller_.Reset();
}
// Computes an appropriate backoff time for the given attempt.
MonoDelta ComputeBackoff(int num_attempts) const;
// Retries an RPC at some point in the near future. If 'why_status' is not OK,
// records it as the most recent error causing the RPC to retry. This is
// reported to the caller eventually if the RPC never succeeds.
//
// If the RPC's deadline expires, the callback will fire with a timeout
// error when the RPC comes up for retrying. This is true even if the
// deadline has already expired at the time that Retry() was called.
//
// Callers should ensure that 'rpc' remains alive.
void DelayedRetry(Rpc* rpc, const Status& why_status);
RpcController* mutable_controller() { return &controller_; }
const RpcController& controller() const { return controller_; }
const MonoTime& deadline() const { return deadline_; }
const std::shared_ptr<Messenger>& messenger() const {
return messenger_;
}
int attempt_num() const { return attempt_num_; }
// Called when an RPC comes up for retrying. Actually sends the RPC.
void DelayedRetryCb(Rpc* rpc, const Status& status);
private:
// The next sent rpc will be the nth attempt (indexed from 1).
int attempt_num_;
// If the remote end is busy, the RPC will be retried (with a small
// delay) until this deadline is reached.
//
// May be uninitialized.
MonoTime deadline_;
// Messenger to use when sending the RPC.
std::shared_ptr<Messenger> messenger_;
// In case any retries have already happened, remembers the last error.
// Errors from the server take precedence over timeout errors.
Status last_error_;
// RPC controller to use when sending the RPC.
RpcController controller_;
// The type of backoff this retrier should employ.
const BackoffType backoff_;
DISALLOW_COPY_AND_ASSIGN(RpcRetrier);
};
// Encapsulates an in-flight remote procedure call to some server, employing
// the provided backoff type to retry when necessary.
class Rpc {
public:
Rpc(const MonoTime& deadline,
std::shared_ptr<rpc::Messenger> messenger,
BackoffType backoff)
: retrier_(deadline, std::move(messenger), backoff) {}
virtual ~Rpc() {}
// Asynchronously sends the RPC to the remote end.
//
// Subclasses should use SendRpcCb() below as the callback function.
virtual void SendRpc() = 0;
// Returns a string representation of the RPC.
virtual std::string ToString() const = 0;
// Returns the number of times this RPC has been sent. Should always be at
// least one.
int num_attempts() const { return retrier().attempt_num(); }
protected:
// Used to retry some failed RPCs.
const RpcRetrier& retrier() const { return retrier_; }
RpcRetrier* mutable_retrier() { return &retrier_; }
private:
friend class RpcRetrier;
// Callback for SendRpc(). If 'status' is not OK, something failed before the
// RPC was sent.
virtual void SendRpcCb(const Status& status) = 0;
// Used to retry some failed RPCs.
RpcRetrier retrier_;
DISALLOW_COPY_AND_ASSIGN(Rpc);
};
} // namespace rpc
} // namespace kudu