| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #ifndef KUDU_RPC_CLIENT_CALL_H |
| #define KUDU_RPC_CLIENT_CALL_H |
| |
| #include <set> |
| #include <string> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/gutil/gscoped_ptr.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/rpc/constants.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/rpc/remote_method.h" |
| #include "kudu/rpc/response_callback.h" |
| #include "kudu/rpc/transfer.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| |
| namespace google { |
| namespace protobuf { |
| class Message; |
| } // namespace protobuf |
| } // namespace google |
| |
| namespace kudu { |
| namespace rpc { |
| |
| class CallResponse; |
| class Connection; |
| class DumpRunningRpcsRequestPB; |
| class InboundTransfer; |
| class RpcCallInProgressPB; |
| class RpcController; |
| |
| // Client-side user credentials, such as a user's username & password. |
| // In the future, we will add Kerberos credentials. |
| // |
| // TODO(mpercy): this is actually used server side too -- should |
| // we instead introduce a RemoteUser class or something? |
| class UserCredentials { |
| public: |
| UserCredentials(); |
| |
| // Effective user, in cases where impersonation is supported. |
| // If impersonation is not supported, this should be left empty. |
| bool has_effective_user() const; |
| void set_effective_user(const std::string& eff_user); |
| const std::string& effective_user() const { return eff_user_; } |
| |
| // Real user. |
| bool has_real_user() const; |
| void set_real_user(const std::string& real_user); |
| const std::string& real_user() const { return real_user_; } |
| |
| // The real user's password. |
| bool has_password() const; |
| void set_password(const std::string& password); |
| const std::string& password() const { return password_; } |
| |
| // Copy state from another object to this one. |
| void CopyFrom(const UserCredentials& other); |
| |
| // Returns a string representation of the object, not including the password field. |
| std::string ToString() const; |
| |
| std::size_t HashCode() const; |
| bool Equals(const UserCredentials& other) const; |
| |
| private: |
| // Remember to update HashCode() and Equals() when new fields are added. |
| std::string eff_user_; |
| std::string real_user_; |
| std::string password_; |
| |
| DISALLOW_COPY_AND_ASSIGN(UserCredentials); |
| }; |
| |
| // Used to key on Connection information. |
| // For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual. |
| // This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that). |
| class ConnectionId { |
| public: |
| ConnectionId(); |
| |
| // Copy constructor required for use with STL unordered_map. |
| ConnectionId(const ConnectionId& other); |
| |
| // Convenience constructor. |
| ConnectionId(const Sockaddr& remote, const UserCredentials& user_credentials); |
| |
| // The remote address. |
| void set_remote(const Sockaddr& remote); |
| const Sockaddr& remote() const { return remote_; } |
| |
| // The credentials of the user associated with this connection, if any. |
| void set_user_credentials(const UserCredentials& user_credentials); |
| const UserCredentials& user_credentials() const { return user_credentials_; } |
| UserCredentials* mutable_user_credentials() { return &user_credentials_; } |
| |
| // Copy state from another object to this one. |
| void CopyFrom(const ConnectionId& other); |
| |
| // Returns a string representation of the object, not including the password field. |
| std::string ToString() const; |
| |
| size_t HashCode() const; |
| bool Equals(const ConnectionId& other) const; |
| |
| private: |
| // Remember to update HashCode() and Equals() when new fields are added. |
| Sockaddr remote_; |
| UserCredentials user_credentials_; |
| |
| // Implementation of CopyFrom that can be shared with copy constructor. |
| void DoCopyFrom(const ConnectionId& other); |
| |
| // Disable assignment operator. |
| void operator=(const ConnectionId&); |
| }; |
| |
| class ConnectionIdHash { |
| public: |
| std::size_t operator() (const ConnectionId& conn_id) const; |
| }; |
| |
| class ConnectionIdEqual { |
| public: |
| bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const; |
| }; |
| |
| // Tracks the status of a call on the client side. |
| // |
| // This is an internal-facing class -- clients interact with the |
| // RpcController class. |
| // |
| // This is allocated by the Proxy when a call is first created, |
| // then passed to the reactor thread to send on the wire. It's typically |
| // kept using a shared_ptr because a call may terminate in any number |
| // of different threads, making it tricky to enforce single ownership. |
| class OutboundCall { |
| public: |
| OutboundCall(const ConnectionId& conn_id, const RemoteMethod& remote_method, |
| google::protobuf::Message* response_storage, |
| RpcController* controller, ResponseCallback callback); |
| |
| ~OutboundCall(); |
| |
| // Serialize the given request PB into this call's internal storage. |
| // |
| // Because the data is fully serialized by this call, 'req' may be |
| // subsequently mutated with no ill effects. |
| Status SetRequestParam(const google::protobuf::Message& req); |
| |
| // Assign the call ID for this call. This is called from the reactor |
| // thread once a connection has been assigned. Must only be called once. |
| void set_call_id(int32_t call_id) { |
| DCHECK_EQ(header_.call_id(), kInvalidCallId) << "Already has a call ID"; |
| header_.set_call_id(call_id); |
| } |
| |
| // Serialize the call for the wire. Requires that SetRequestParam() |
| // is called first. This is called from the Reactor thread. |
| Status SerializeTo(std::vector<Slice>* slices); |
| |
| // Callback after the call has been put on the outbound connection queue. |
| void SetQueued(); |
| |
| // Update the call state to show that the request has been sent. |
| void SetSent(); |
| |
| // Mark the call as failed. This also triggers the callback to notify |
| // the caller. If the call failed due to a remote error, then err_pb |
| // should be set to the error returned by the remote server. Takes |
| // ownership of 'err_pb'. |
| void SetFailed(const Status& status, |
| ErrorStatusPB* err_pb = NULL); |
| |
| // Mark the call as timed out. This also triggers the callback to notify |
| // the caller. |
| void SetTimedOut(); |
| bool IsTimedOut() const; |
| |
| // Is the call finished? |
| bool IsFinished() const; |
| |
| // Fill in the call response. |
| void SetResponse(gscoped_ptr<CallResponse> resp); |
| |
| std::set<RpcFeatureFlag> RequiredRpcFeatures() const; |
| |
| std::string ToString() const; |
| |
| void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp); |
| |
| //////////////////////////////////////////////////////////// |
| // Getters |
| //////////////////////////////////////////////////////////// |
| |
| const ConnectionId& conn_id() const { return conn_id_; } |
| const RemoteMethod& remote_method() const { return remote_method_; } |
| const ResponseCallback &callback() const { return callback_; } |
| RpcController* controller() { return controller_; } |
| const RpcController* controller() const { return controller_; } |
| |
| // Return true if a call ID has been assigned to this call. |
| bool call_id_assigned() const { |
| return header_.call_id() != kInvalidCallId; |
| } |
| |
| int32_t call_id() const { |
| DCHECK(call_id_assigned()); |
| return header_.call_id(); |
| } |
| |
| private: |
| friend class RpcController; |
| |
| // Various states the call propagates through. |
| // NB: if adding another state, be sure to update OutboundCall::IsFinished() |
| // and OutboundCall::StateName(State state) as well. |
| enum State { |
| READY = 0, |
| ON_OUTBOUND_QUEUE = 1, |
| SENT = 2, |
| TIMED_OUT = 3, |
| FINISHED_ERROR = 4, |
| FINISHED_SUCCESS = 5 |
| }; |
| |
| static std::string StateName(State state); |
| |
| void set_state(State new_state); |
| State state() const; |
| |
| // Same as set_state, but requires that the caller already holds |
| // lock_ |
| void set_state_unlocked(State new_state); |
| |
| // return current status |
| Status status() const; |
| |
| // Time when the call was first initiatied. |
| MonoTime start_time_; |
| |
| // Return the error protobuf, if a remote error occurred. |
| // This will only be non-NULL if status().IsRemoteError(). |
| const ErrorStatusPB* error_pb() const; |
| |
| // Lock for state_ status_, error_pb_ fields, since they |
| // may be mutated by the reactor thread while the client thread |
| // reads them. |
| mutable simple_spinlock lock_; |
| State state_; |
| Status status_; |
| gscoped_ptr<ErrorStatusPB> error_pb_; |
| |
| // Call the user-provided callback. |
| void CallCallback(); |
| |
| // The RPC header. |
| // Parts of this (eg the call ID) are only assigned once this call has been |
| // passed to the reactor thread and assigned a connection. |
| RequestHeader header_; |
| |
| // The remote method being called. |
| RemoteMethod remote_method_; |
| |
| ConnectionId conn_id_; |
| ResponseCallback callback_; |
| RpcController* controller_; |
| |
| // Pointer for the protobuf where the response should be written. |
| google::protobuf::Message* response_; |
| |
| // Buffers for storing segments of the wire-format request. |
| faststring header_buf_; |
| faststring request_buf_; |
| |
| // Once a response has been received for this call, contains that response. |
| // Otherwise NULL. |
| gscoped_ptr<CallResponse> call_response_; |
| |
| DISALLOW_COPY_AND_ASSIGN(OutboundCall); |
| }; |
| |
| // A response to a call, on the client side. |
| // Upon receiving a response, this is allocated in the reactor thread and filled |
| // into the OutboundCall instance via OutboundCall::SetResponse. |
| // |
| // This may either be a success or error response. |
| // |
| // This class takes care of separating out the distinct payload slices sent |
| // over. |
| class CallResponse { |
| public: |
| CallResponse(); |
| |
| // Parse the response received from a call. This must be called before any |
| // other methods on this object. |
| Status ParseFrom(gscoped_ptr<InboundTransfer> transfer); |
| |
| // Return true if the call succeeded. |
| bool is_success() const { |
| DCHECK(parsed_); |
| return !header_.is_error(); |
| } |
| |
| // Return the call ID that this response is related to. |
| int32_t call_id() const { |
| DCHECK(parsed_); |
| return header_.call_id(); |
| } |
| |
| // Return the serialized response data. This is just the response "body" -- |
| // either a serialized ErrorStatusPB, or the serialized user response protobuf. |
| const Slice &serialized_response() const { |
| DCHECK(parsed_); |
| return serialized_response_; |
| } |
| |
| // See RpcController::GetSidecar() |
| Status GetSidecar(int idx, Slice* sidecar) const; |
| |
| private: |
| // True once ParseFrom() is called. |
| bool parsed_; |
| |
| // The parsed header. |
| ResponseHeader header_; |
| |
| // The slice of data for the encoded protobuf response. |
| // This slice refers to memory allocated by transfer_ |
| Slice serialized_response_; |
| |
| // Slices of data for rpc sidecars. They point into memory owned by transfer_. |
| Slice sidecar_slices_[OutboundTransfer::kMaxPayloadSlices]; |
| |
| // The incoming transfer data - retained because serialized_response_ |
| // and sidecar_slices_ refer into its data. |
| gscoped_ptr<InboundTransfer> transfer_; |
| |
| DISALLOW_COPY_AND_ASSIGN(CallResponse); |
| }; |
| |
| } // namespace rpc |
| } // namespace kudu |
| |
| #endif |