| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/rpc/proxy.h" |
| |
| #include <functional> |
| #include <iostream> |
| #include <memory> |
| #include <utility> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/outbound_call.h" |
| #include "kudu/rpc/remote_method.h" |
| #include "kudu/rpc/response_callback.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/rpc/user_credentials.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/notification.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/user.h" |
| |
| using google::protobuf::Message; |
| using std::string; |
| using std::shared_ptr; |
| |
| namespace kudu { |
| namespace rpc { |
| |
| Proxy::Proxy(std::shared_ptr<Messenger> messenger, |
| const Sockaddr& remote, |
| string hostname, |
| string service_name) |
| : service_name_(std::move(service_name)), |
| messenger_(std::move(messenger)), |
| is_started_(false) { |
| CHECK(messenger_ != nullptr); |
| DCHECK(!service_name_.empty()) << "Proxy service name must not be blank"; |
| DCHECK(remote.is_initialized()); |
| // By default, we set the real user to the currently logged-in user. |
| // Effective user and password remain blank. |
| string real_user; |
| Status s = GetLoggedInUser(&real_user); |
| if (!s.ok()) { |
| LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: " |
| << s.ToString() << " before connecting to remote: " << remote.ToString(); |
| } |
| |
| UserCredentials creds; |
| creds.set_real_user(std::move(real_user)); |
| conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds)); |
| } |
| |
| Proxy::~Proxy() { |
| } |
| |
| void Proxy::AsyncRequest(const string& method, |
| const google::protobuf::Message& req, |
| google::protobuf::Message* response, |
| RpcController* controller, |
| const ResponseCallback& callback) const { |
| CHECK(!controller->call_) << "Controller should be reset"; |
| base::subtle::NoBarrier_Store(&is_started_, true); |
| RemoteMethod remote_method(service_name_, method); |
| controller->call_.reset( |
| new OutboundCall(conn_id_, remote_method, response, controller, callback)); |
| controller->SetRequestParam(req); |
| controller->SetMessenger(messenger_.get()); |
| |
| // If this fails to queue, the callback will get called immediately |
| // and the controller will be in an ERROR state. |
| messenger_->QueueOutboundCall(controller->call_); |
| } |
| |
| |
| Status Proxy::SyncRequest(const string& method, |
| const google::protobuf::Message& req, |
| google::protobuf::Message* resp, |
| RpcController* controller) const { |
| Notification note; |
| AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller, |
| [¬e]() { note.Notify(); }); |
| note.WaitForNotification(); |
| return controller->status(); |
| } |
| |
| void Proxy::set_user_credentials(UserCredentials user_credentials) { |
| CHECK(base::subtle::NoBarrier_Load(&is_started_) == false) |
| << "It is illegal to call set_user_credentials() after request processing has started"; |
| conn_id_.set_user_credentials(std::move(user_credentials)); |
| } |
| |
| void Proxy::set_network_plane(string network_plane) { |
| CHECK(base::subtle::NoBarrier_Load(&is_started_) == false) |
| << "It is illegal to call set_network_plane() after request processing has started"; |
| conn_id_.set_network_plane(std::move(network_plane)); |
| } |
| |
| std::string Proxy::ToString() const { |
| return strings::Substitute("$0@$1", service_name_, conn_id_.ToString()); |
| } |
| |
| } // namespace rpc |
| } // namespace kudu |