blob: cb822057d86b38ce926523394166da158416f614 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/rpc/connection_id.h"
#include "kudu/rpc/outbound_call.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/util/locks.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace kudu {
class DnsResolver;
class Sockaddr;
namespace rpc {
class Messenger;
class RpcController;
class UserCredentials;
// Interface to send calls to a remote service.
//
// Proxy objects do not map one-to-one with TCP connections. The underlying TCP
// connection is not established until the first call, and may be torn down and
// re-established as necessary by the messenger. Additionally, the messenger is
// likely to multiplex many Proxy objects on the same connection.
//
// A proxy object can optionally specify the "network plane" it uses. This allows
// proxies of N services to be multiplexed on M TCP connections so that a higher priority
// service (e.g. a control channel) may use a different connection than other services,
// avoiding the chance of being blocked by traffic of other services.
//
// Proxy objects are thread-safe after initialization only.
// Setters on the Proxy are not thread-safe, and calling a setter after any RPC
// request has started will cause a fatal error.
//
// After initialization, multiple threads may make calls using the same proxy object.
class Proxy {
public:
Proxy(std::shared_ptr<Messenger> messenger,
const Sockaddr& remote,
std::string hostname,
std::string service_name);
// TODO(awong): consider a separate auto-resolving proxy class?
Proxy(std::shared_ptr<Messenger> messenger,
HostPort hp,
DnsResolver* dns_resolver,
std::string service_name);
~Proxy();
// If the proxy is configured for address re-resolution (by supplying a
// DnsResolver and HostPort in the constructor), performs an initial
// resolution of the address using the HostPort. If 'addr' is supplied, it is
// used instead of performing resolution (this is useful to initialize
// several proxies with a single external DNS resolution).
//
// Otherwise, this is a no-op.
//
// NOTE: it is always OK to skip calling this method -- if this proxy is
// configured for address re-resolution and this is skipped, the resolution
// will happen upon sending the first request.
void Init(Sockaddr addr = {});
// Call a remote method asynchronously.
//
// Typically, users will not call this directly, but rather through
// a generated Proxy subclass.
//
// method: the method name to invoke on the remote server.
//
// req: the request protobuf. This will be serialized immediately,
// so the caller may free or otherwise mutate 'req' safely.
//
// resp: the response protobuf. This protobuf will be mutated upon
// completion of the call. The RPC system does not take ownership
// of this storage.
//
// NOTE: 'req' and 'resp' should be the appropriate protocol buffer implementation
// class corresponding to the parameter and result types of the service method
// defined in the service's '.proto' file.
//
// controller: the RpcController to associate with this call. Each call
// must use a unique controller object. Does not take ownership.
//
// callback: the callback to invoke upon call completion. This callback may
// be invoked before AsyncRequest() itself returns, or any time
// thereafter. It may be invoked either on the caller's thread
// or by an RPC IO thread, and thus should take care to not
// block or perform any heavy CPU work.
void AsyncRequest(const std::string& method,
const google::protobuf::Message& req,
google::protobuf::Message* resp,
RpcController* controller,
const ResponseCallback& callback);
// The same as AsyncRequest(), except that the call blocks until the call
// finishes. If the call fails, returns a non-OK result.
Status SyncRequest(const std::string& method,
const google::protobuf::Message& req,
google::protobuf::Message* resp,
RpcController* controller);
// Set the user credentials which should be used to log in.
void set_user_credentials(UserCredentials user_credentials);
// Get the user credentials which should be used to log in.
const UserCredentials& user_credentials() const { return conn_id_.user_credentials(); }
// Set the network plane which this proxy uses.
void set_network_plane(std::string network_plane);
// Get the network plane which this proxy uses.
const std::string& network_plane() const { return conn_id_.network_plane(); }
std::string ToString() const;
private:
// Asynchronously refreshes the DNS, enqueueing the given request upon
// success, or failing the call and calling the callback upon failure.
void RefreshDnsAndEnqueueRequest(const std::string& method,
std::unique_ptr<RequestPayload> req_payload,
google::protobuf::Message* response,
RpcController* controller,
const ResponseCallback& callback);
// Queues the given request as an outbound call using the given messenger,
// controller, and response.
void EnqueueRequest(const std::string& method,
std::unique_ptr<RequestPayload> req_payload,
google::protobuf::Message* response,
RpcController* controller,
const ResponseCallback& callback,
OutboundCall::CallbackBehavior cb_behavior) const;
// Returns a single Sockaddr from the 'addrs', logging a warning if there is
// more than one to choose from.
Sockaddr* GetSingleSockaddr(std::vector<Sockaddr>* addrs) const;
ConnectionId conn_id() const {
std::lock_guard<simple_spinlock> l(lock_);
return conn_id_;
}
const std::string service_name_;
HostPort hp_;
DnsResolver* dns_resolver_;
std::shared_ptr<Messenger> messenger_;
// TODO(awong): consider implementing some lock-free list of ConnectionIds
// instead of taking a lock every time we want to get the "current"
// ConnectionId.
//
// Connection ID used by this proxy. Once the proxy has started sending
// requests, the connection ID may be updated in response to calls (e.g. if
// we re-resolved the physical address in response to an invalid DNS entry).
// As such, 'conn_id_' is protected by 'lock_', and should be copied and
// passed around, rather than used directly
mutable simple_spinlock lock_;
ConnectionId conn_id_;
mutable Atomic32 is_started_;
DISALLOW_COPY_AND_ASSIGN(Proxy);
};
} // namespace rpc
} // namespace kudu