blob: 9501a388a963be82ba15a8254749c0f22b493559 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/proxy.h"
#include <functional>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/outbound_call.h"
#include "kudu/rpc/remote_method.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/user_credentials.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/logging.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/notification.h"
#include "kudu/util/status.h"
#include "kudu/util/user.h"
using google::protobuf::Message;
using std::string;
using std::shared_ptr;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace rpc {
Proxy::Proxy(std::shared_ptr<Messenger> messenger,
const Sockaddr& remote,
string hostname,
string service_name)
: service_name_(std::move(service_name)),
dns_resolver_(nullptr),
messenger_(std::move(messenger)),
is_started_(false) {
CHECK(messenger_ != nullptr);
DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
DCHECK(remote.is_initialized());
// By default, we set the real user to the currently logged-in user.
// Effective user and password remain blank.
string real_user;
Status s = GetLoggedInUser(&real_user);
if (!s.ok()) {
LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: "
<< s.ToString() << " before connecting to remote: " << remote.ToString();
}
UserCredentials creds;
creds.set_real_user(std::move(real_user));
conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds));
}
Proxy::Proxy(std::shared_ptr<Messenger> messenger,
HostPort hp,
DnsResolver* dns_resolver,
string service_name)
: service_name_(std::move(service_name)),
hp_(std::move(hp)),
dns_resolver_(dns_resolver),
messenger_(std::move(messenger)),
is_started_(false) {
CHECK(messenger_ != nullptr);
DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
DCHECK(hp_.Initialized());
}
Sockaddr* Proxy::GetSingleSockaddr(std::vector<Sockaddr>* addrs) const {
DCHECK(!addrs->empty());
if (PREDICT_FALSE(addrs->size() > 1)) {
LOG(WARNING) << Substitute(
"$0 proxy host/port $1 resolves to $2 different addresses. Using $3",
service_name_, hp_.ToString(), addrs->size(), (*addrs)[0].ToString());
}
return &(*addrs)[0];
}
void Proxy::Init(Sockaddr addr) {
if (!dns_resolver_) {
return;
}
// By default, we set the real user to the currently logged-in user.
// Effective user and password remain blank.
string real_user;
Status s = GetLoggedInUser(&real_user);
if (!s.ok()) {
LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: "
<< s.ToString() << " before connecting to host/port: " << hp_.ToString();
}
vector<Sockaddr> addrs;
if (!addr.is_initialized()) {
s = dns_resolver_->ResolveAddresses(hp_, &addrs);
if (PREDICT_TRUE(s.ok() && !addrs.empty())) {
addr = *GetSingleSockaddr(&addrs);
DCHECK(addr.is_initialized());
addr.set_port(hp_.port());
// NOTE: it's ok to proceed on failure -- the address will remain
// uninitialized and be re-resolved when sending the next request.
}
}
UserCredentials creds;
creds.set_real_user(std::move(real_user));
conn_id_ = ConnectionId(addr, hp_.host(), std::move(creds));
}
Proxy::~Proxy() {
}
void Proxy::EnqueueRequest(const string& method,
unique_ptr<RequestPayload> req_payload,
google::protobuf::Message* response,
RpcController* controller,
const ResponseCallback& callback,
OutboundCall::CallbackBehavior cb_behavior) const {
ConnectionId connection = conn_id();
DCHECK(connection.remote().is_initialized());
controller->call_.reset(
new OutboundCall(connection, {service_name_, method}, std::move(req_payload),
cb_behavior, response, controller, callback));
controller->SetMessenger(messenger_.get());
// If this fails to queue, the callback will get called immediately
// and the controller will be in an ERROR state.
messenger_->QueueOutboundCall(controller->call_);
}
void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method,
unique_ptr<RequestPayload> req_payload,
google::protobuf::Message* response,
RpcController* controller,
const ResponseCallback& callback) {
DCHECK(!controller->call_);
vector<Sockaddr>* addrs = new vector<Sockaddr>();
DCHECK_NOTNULL(dns_resolver_)->RefreshAddressesAsync(hp_, addrs,
[this, req_raw = req_payload.release(),
&method, callback, response, controller, addrs] (const Status& s) mutable {
unique_ptr<RequestPayload> req_payload(req_raw);
unique_ptr<vector<Sockaddr>> unique_addrs(addrs);
// If we fail to resolve the address, treat the call as failed.
if (!s.ok() || addrs->empty()) {
DCHECK(!controller->call_);
// NOTE: we need to keep a reference here because the callback may end up
// destructing the controller and the outbound call, _while_ the callback
// is running from within the call!
auto shared_call = std::make_shared<OutboundCall>(
conn_id(), RemoteMethod{service_name_, method}, response, controller, callback);
controller->call_ = shared_call;
controller->call_->SetFailed(s.CloneAndPrepend("failed to refresh physical address"));
return;
}
auto* addr = GetSingleSockaddr(addrs);
DCHECK(addr->is_initialized());
addr->set_port(hp_.port());
{
std::lock_guard<simple_spinlock> l(lock_);
conn_id_.set_remote(*addr);
}
// NOTE: we don't expect the user-provided callback to free sidecars, so
// make sure the outbound call frees it for us.
EnqueueRequest(method, std::move(req_payload), response, controller, callback,
OutboundCall::CallbackBehavior::kFreeSidecars);
});
}
void Proxy::AsyncRequest(const string& method,
const google::protobuf::Message& req,
google::protobuf::Message* response,
RpcController* controller,
const ResponseCallback& callback) {
CHECK(!controller->call_) << "Controller should be reset";
base::subtle::NoBarrier_Store(&is_started_, true);
// TODO(awong): it would be great if we didn't have to heap allocate the
// payload.
auto req_payload = RequestPayload::CreateRequestPayload(
RemoteMethod{service_name_, method},
req, controller->ReleaseOutboundSidecars());
if (!dns_resolver_) {
// NOTE: we don't expect the user-provided callback to free sidecars, so
// make sure the outbound call frees it for us.
EnqueueRequest(method, std::move(req_payload), response, controller, callback,
OutboundCall::CallbackBehavior::kFreeSidecars);
return;
}
// If we haven't successfully initialized the remote, e.g. because the DNS
// lookup failed, refresh the DNS entry and enqueue the request.
bool remote_initialized;
{
std::lock_guard<simple_spinlock> l(lock_);
remote_initialized = conn_id_.remote().is_initialized();
}
if (!remote_initialized) {
RefreshDnsAndEnqueueRequest(method, std::move(req_payload), response, controller, callback);
return;
}
// Otherwise, just enqueue the request, but retry if there's an error, since
// it's possible the physical address of the host was changed. We only retry
// once more before calling the callback.
auto refresh_dns_and_cb = [this, &method, callback, response, controller] () {
// TODO(awong): we should be more specific here -- consider having the RPC
// layer set a flag in the controller that warrants a retry.
if (PREDICT_FALSE(!controller->status().ok())) {
KLOG_EVERY_N_SECS(WARNING, 5)
<< Substitute("Call had error, refreshing address and retrying: $0",
controller->status().ToString());
auto req_payload = controller->ReleaseRequestPayload();
controller->Reset();
RefreshDnsAndEnqueueRequest(method, std::move(req_payload), response, controller, callback);
return;
}
// For any other status, OK or otherwise, just run the callback.
controller->FreeOutboundSidecars();
SCOPED_WATCH_STACK(100);
callback();
};
// Since we may end up using the request payload in the event of a retry,
// ensure the outbound call doesn't free the sidecars, and instead free
// manually from within our callback.
EnqueueRequest(method, std::move(req_payload), response, controller, refresh_dns_and_cb,
OutboundCall::CallbackBehavior::kDontFreeSidecars);
}
Status Proxy::SyncRequest(const string& method,
const google::protobuf::Message& req,
google::protobuf::Message* resp,
RpcController* controller) {
Notification note;
AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller,
[&note]() { note.Notify(); });
note.WaitForNotification();
return controller->status();
}
void Proxy::set_user_credentials(UserCredentials user_credentials) {
CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
<< "It is illegal to call set_user_credentials() after request processing has started";
conn_id_.set_user_credentials(std::move(user_credentials));
}
void Proxy::set_network_plane(string network_plane) {
CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
<< "It is illegal to call set_network_plane() after request processing has started";
conn_id_.set_network_plane(std::move(network_plane));
}
std::string Proxy::ToString() const {
return Substitute("$0@$1", service_name_, conn_id_.ToString());
}
} // namespace rpc
} // namespace kudu