blob: 49bd527989e080c7528ddd2b3f043ad0f7a5563e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <unordered_set>
#include <vector>
#include "kudu/gutil/macros.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace kudu {
class Slice;
namespace rpc {
class ErrorStatusPB;
class Messenger;
class OutboundCall;
class RequestIdPB;
class RequestPayload;
class RpcSidecar;
// Authentication credentials policy for outbound RPCs. Some RPC methods
// (e.g. MasterService::ConnectToMaster) behave differently depending on the
// type of credentials used for authentication when establishing the connection.
// The client expecting some particular results from the call should specify
// the required policy on a per-call basis using RpcController. By default,
// RpcController uses ANY_CREDENTIALS.
enum class CredentialsPolicy {
// It's acceptable to use authentication credentials of any type, primary or
// secondary ones.
ANY_CREDENTIALS,
// Only primary credentials are acceptable. Primary credentials are Kerberos
// tickets, TLS certificate. Secondary credentials are authentication tokens:
// they are 'derived' in the sense that it's possible to acquire them using
// 'primary' credentials.
PRIMARY_CREDENTIALS,
};
// Controller for managing properties of a single RPC call, on the client side.
//
// An RpcController maps to exactly one call and is not thread-safe. The client
// may use this class prior to sending an RPC in order to set properties such
// as the call's timeout.
//
// After the call has been sent (e.g using Proxy::AsyncRequest()) the user
// may invoke methods on the RpcController object in order to probe the status
// of the call.
class RpcController {
public:
RpcController();
~RpcController();
// Swap the state of the controller (including ownership of sidecars, buffers,
// etc) with another one.
void Swap(RpcController* other);
// Reset this controller so it may be used with another call.
// Note that this resets the required server features.
void Reset();
// Return true if the call has finished.
// A call is finished if the server has responded, or if the call
// has timed out.
bool finished() const;
// Whether the call failed due to connection negotiation error.
bool negotiation_failed() const;
// Return the current status of a call.
//
// A call is "OK" status until it finishes, at which point it may
// either remain in "OK" status (if the call was successful), or
// change to an error status. Error status indicates that there was
// some RPC-layer issue with making the call, for example, one of:
//
// * failed to establish a connection to the server
// * the server was too busy to handle the request
// * the server was unable to interpret the request (eg due to a version
// mismatch)
// * a network error occurred which caused the connection to be torn
// down
// * the call timed out
Status status() const;
// If status() returns a RemoteError object, then this function returns
// the error response provided by the server. Service implementors may
// use protobuf Extensions to add application-specific data to this PB.
//
// If Status was not a RemoteError, this returns NULL.
// The returned pointer is only valid as long as the controller object.
const ErrorStatusPB* error_response() const;
// Set the timeout for the call to be made with this RPC controller.
//
// The configured timeout applies to the entire time period between
// the AsyncRequest() method call and getting a response. For example,
// if it takes too long to establish a connection to the remote host,
// or to DNS-resolve the remote host, those will be accounted as part
// of the timeout period.
//
// Timeouts must be set prior to making the request -- the timeout may
// not currently be adjusted for an already-sent call.
//
// Using an uninitialized timeout will result in a call which never
// times out (not recommended!)
void set_timeout(const MonoDelta& timeout);
// Like a timeout, but based on a fixed point in time instead of a delta.
//
// Using an uninitialized deadline means the call won't time out.
void set_deadline(const MonoTime& deadline);
// Allows setting the request id for the next request sent to the server.
// A request id allows the server to identify each request sent by the client uniquely,
// in some cases even when sent to multiple servers, enabling exactly once semantics.
void SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id);
// Releases the outbound sidecars added to this controller. This is useful if
// callers want to create a request payload for a request.
std::vector<std::unique_ptr<RpcSidecar>> ReleaseOutboundSidecars();
// Frees the outbound sidecars added to this controller. OutboundCalls may
// call this before running the user-provided callback to ensure the state is
// freed before the callback is run. However, certain usages of OutboundCall
// warrants delaying the freeing until during the callback.
void FreeOutboundSidecars();
// Releases the request payload owned by this controller. This is useful if
// callers want to reuse a request payload in another attempt of a call, as
// it allows callers to transfer ownership to a new outbound call.
std::unique_ptr<RequestPayload> ReleaseRequestPayload();
// Returns whether a request id has been set on RPC header.
bool has_request_id() const;
// Returns the currently set request id.
// When the request is sent to the server, it gets "moved" from RpcController
// so an absence of a request after send doesn't mean one wasn't sent.
// REQUIRES: the controller has a request ID set.
const RequestIdPB& request_id() const;
// Add a requirement that the server side must support a feature with the
// given identifier. The set of required features is sent to the server
// with the RPC call, and if any required feature is not supported, the
// call will fail with a NotSupported() status.
//
// This can be used when an RPC call changes in a way that is protobuf-compatible,
// but for which it would not be appropriate for the server to simply ignore
// an added field. For example, consider an API call like:
//
// message DeleteAccount {
// optional string username = 1;
// optional bool dry_run = 2; // ADDED LATER!
// }
//
// In this case, if a new client which supports the 'dry_run' flag sends the RPC
// to an old server, the old server will simply ignore the unrecognized parameter,
// with highly problematic results. To solve this problem, the new version can
// add a feature flag:
//
// In .proto file
// ----------------
// enum MyFeatureFlags {
// UNKNOWN = 0;
// DELETE_ACCOUNT_SUPPORTS_DRY_RUN = 1;
// }
//
// In client code:
// ---------------
// if (dry_run) {
// rpc.RequireServerFeature(DELETE_ACCOUNT_SUPPORTS_DRY_RUN);
// req.set_dry_run(true);
// }
//
// This has the effect of (a) maintaining compatibility when dry_run is not specified
// and (b) rejecting the RPC with a "NotSupported" error when it is.
//
// NOTE: 'feature' is an int rather than an enum type because each service
// must define its own enum of supported features, and protobuf doesn't support
// any ability to 'extend' enum types. Implementers should define an enum in the
// service's protobuf definition as shown above.
void RequireServerFeature(uint32_t feature);
// Executes the provided function with a reference to the required server
// features.
const std::unordered_set<uint32_t>& required_server_features() const {
return required_server_features_;
}
// Return the configured timeout.
MonoDelta timeout() const;
CredentialsPolicy credentials_policy() const {
return credentials_policy_;
}
void set_credentials_policy(CredentialsPolicy policy) {
credentials_policy_ = policy;
}
// Returns the call_id of this RPC call.
// Should only be called after the call's Response has been received (that is, when
// Connection::HandleCallResponse() has been executed over 'call_'), but the
// controller has not been Reset().
int32_t call_id() const;
// Fills the 'sidecar' parameter with the slice pointing to the i-th
// sidecar upon success.
//
// Should only be called if the call's finished, but the controller has not
// been Reset().
//
// May fail if index is invalid.
Status GetInboundSidecar(int idx, Slice* sidecar) const;
// Adds a sidecar to the outbound request. The index of the sidecar is written to
// 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
// to this request. Also returns an error if the total size of all sidecars would
// exceed TransferLimits::kMaxTotalSidecarBytes.
Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
// Cancel the call associated with the RpcController. This function should only be
// called when there is an outstanding outbound call. It's always safe to call
// Cancel() after you've sent a call, so long as you haven't called Reset() yet.
// Caller is not responsible for synchronization between cancellation and the
// callback. (i.e. the callback may or may not be invoked yet when Cancel()
// is called).
//
// Cancellation is "best effort" - i.e. it's still possible the callback passed
// to the call will be fired with a success status. If cancellation succeeds,
// the callback will be invoked with a Aborted status. Cancellation is asynchronous
// so the callback will still be invoked from the reactor thread.
void Cancel();
private:
friend class OutboundCall;
friend class Proxy;
// Set the outbound call_'s request parameter, and transfer ownership of
// outbound_sidecars_ to call_ in preparation for serialization.
void SetRequestParam(const google::protobuf::Message& req);
// Set the messenger which contains the reactor thread handling the outbound call.
void SetMessenger(Messenger* messenger) { messenger_ = messenger; }
MonoDelta timeout_;
std::unordered_set<uint32_t> required_server_features_;
// RPC authentication policy for outbound calls.
CredentialsPolicy credentials_policy_;
mutable simple_spinlock lock_;
// The id of this request.
// Ownership is transferred to OutboundCall once the call is sent.
std::unique_ptr<RequestIdPB> request_id_;
// The messenger which contains the reactor thread for 'call_'.
// Set only when 'call_' is set.
Messenger* messenger_;
// Once the call is sent, it is tracked here.
std::shared_ptr<OutboundCall> call_;
// Owned by the controller until released and taken by a call.
std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
// Total size of sidecars in outbound_sidecars_. This is limited to a maximum
// of TransferLimits::kMaxTotalSidecarBytes.
int32_t outbound_sidecars_total_bytes_ = 0;
DISALLOW_COPY_AND_ASSIGN(RpcController);
};
} // namespace rpc
} // namespace kudu