// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>

#include <boost/intrusive/list.hpp>
#include <ev++.h>
#include <glog/logging.h>

#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/connection_id.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/object_pool.h"
#include "kudu/util/status.h"

namespace kudu {

namespace rpc {

class DumpConnectionsRequestPB;
class InboundCall;
class OutboundCall;
class ReactorThread;
class RpcConnectionPB;
class RpczStore;
class SocketStatsPB;
class TransportDetailsPB;

enum class CredentialsPolicy;

//
// A connection between an endpoint and us.
//
// Inbound connections are created by AcceptorPools, which eventually schedule
// RegisterConnection() to be called from the reactor thread.
//
// Outbound connections are created by the Reactor thread in order to service
// outbound calls.
//
// Once a Connection is created, it can be used both for sending messages and
// receiving them, but any given connection is explicitly a client or server.
// If a pair of servers are making bidirectional RPCs, they will use two separate
// TCP connections (and Connection objects).
//
// This class is not fully thread-safe.  It is accessed only from the context of a
// single ReactorThread except where otherwise specified.
//
class Connection : public RefCountedThreadSafe<Connection> {
 public:
  enum Direction {
    // This host is sending calls via this connection.
    CLIENT,
    // This host is receiving calls via this connection.
    SERVER
  };

  // Create a new Connection.
  // reactor_thread: the reactor that owns us.
  // remote: the address of the remote end
  // socket: the socket to take ownership of.
  // direction: whether we are the client or server side
  Connection(ReactorThread *reactor_thread,
             Sockaddr remote,
             std::unique_ptr<Socket> socket,
             Direction direction,
             CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS);

  // Set underlying socket to non-blocking (or blocking) mode.
  Status SetNonBlocking(bool enabled);

  // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent
  // to the remote end after the connection has been idle for 'idle_time_s' seconds.
  // It will retry sending probes up to 'num_retries' number of times until an ACK is
  // heard from peer. 'retry_time_s' is the sleep time in seconds between successive
  // keepalive probes.
  Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries);

  // Register our socket with an epoll loop.  We will only ever be registered in
  // one epoll loop at a time.
  void EpollRegister(ev::loop_ref& loop);

  ~Connection();

  MonoTime last_activity_time() const {
    return last_activity_time_;
  }

  // Returns true if we are not in the process of receiving or sending a
  // message, and we have no outstanding calls.
  bool Idle() const;

  // Fail any calls which are currently queued or awaiting response.
  // Prohibits any future calls (they will be failed immediately with this
  // same Status).
  void Shutdown(const Status& status,
                std::unique_ptr<ErrorStatusPB> rpc_error = {});

  // Queue a new call to be made. If the queueing fails, the call will be
  // marked failed. The caller is expected to check if 'call' has been cancelled
  // before making the call.
  // Takes ownership of the 'call' object regardless of whether it succeeds or fails.
  void QueueOutboundCall(std::shared_ptr<OutboundCall> call);

  // Queue a call response back to the client on the server side.
  //
  // This may be called from a non-reactor thread.
  void QueueResponseForCall(std::unique_ptr<InboundCall> call);

  // Cancel an outbound call by removing any reference to it by CallAwaitingResponse
  // in 'awaiting_responses_'.
  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);

  // The address of the remote end of the connection.
  const Sockaddr& remote() const { return remote_; }

  // The address of the local end of the connection.
  Status GetLocalAddress(Sockaddr* addr) const;

  // Set the user credentials for an outbound connection.
  void set_outbound_connection_id(ConnectionId conn_id) {
    DCHECK_EQ(direction_, CLIENT);
    DCHECK(!outbound_connection_id_);
    outbound_connection_id_ = std::move(conn_id);
  }

  // Get the user credentials which will be used to log in.
  const ConnectionId& outbound_connection_id() const {
    DCHECK_EQ(direction_, CLIENT);
    DCHECK(outbound_connection_id_);
    return *outbound_connection_id_;
  }

  bool is_confidential() const {
    return is_confidential_;
  }

  // Set/unset the 'confidentiality' property for this connection.
  void set_confidential(bool is_confidential);

  // Credentials policy to start connection negotiation.
  CredentialsPolicy credentials_policy() const { return credentials_policy_; }

  // Whether the connection satisfies the specified credentials policy.
  //
  // NOTE: The policy is set prior to connection negotiation, and the actual
  //       authentication credentials used for connection negotiation might
  //       effectively make the connection to satisfy a stronger policy.
  //       An example: the credentials policy for the connection was set to
  //       ANY_CREDENTIALS, but since the authn token was not available
  //       at the time of negotiation, the primary credentials were used, making
  //       the connection de facto satisfying the PRIMARY_CREDENTIALS policy.
  bool SatisfiesCredentialsPolicy(CredentialsPolicy policy) const;

  RpczStore* rpcz_store();

  // libev callback when data is available to read.
  void ReadHandler(ev::io &watcher, int revents);

  // libev callback when we may write to the socket.
  void WriteHandler(ev::io &watcher, int revents);

  enum ProcessOutboundTransfersResult {
    // All of the transfers in the queue have been sent successfully.
    // The queue is now empty.
    kNoMoreToSend,
    // Not all transfers were able to be sent. The caller should
    // ensure that write_io_ is enabled in order to continue attempting
    // to send transfers.
    kMoreToSend,
    // An error occurred trying to write to the connection, and the
    // connection was destroyed. NOTE: 'this' may be deleted if this
    // value is returned.
    kConnectionDestroyed
  };

  // Process any pending outbound transfers in outbound_transfers_.
  // Result indicates the state of the connection following the attempt.
  //
  // NOTE: This may invoke DestroyConnection() on 'this'.
  ProcessOutboundTransfersResult ProcessOutboundTransfers();

  // Safe to be called from other threads.
  std::string ToString() const;

  Direction direction() const { return direction_; }

  Socket* socket() { return socket_.get(); }

  // Go through the process of transferring control of the underlying socket back to the Reactor.
  void CompleteNegotiation(Status negotiation_status,
                           std::unique_ptr<ErrorStatusPB> rpc_error);

  // Indicate that negotiation is complete and that the Reactor is now in control of the socket.
  void MarkNegotiationComplete();

  Status DumpPB(const DumpConnectionsRequestPB& req,
                RpcConnectionPB* resp);

  ReactorThread* reactor_thread() const { return reactor_thread_; }

  std::unique_ptr<Socket> release_socket() {
    return std::move(socket_);
  }

  void adopt_socket(std::unique_ptr<Socket> socket) {
    socket_ = std::move(socket);
  }

  void set_remote_features(std::set<RpcFeatureFlag> remote_features) {
    remote_features_ = std::move(remote_features);
  }

  void set_remote_user(RemoteUser user) {
    DCHECK_EQ(direction_, SERVER);
    remote_user_ = std::move(user);
  }

  const RemoteUser& remote_user() const {
    DCHECK_EQ(direction_, SERVER);
    return remote_user_;
  }

  // Whether the connection is scheduled for shutdown.
  bool scheduled_for_shutdown() const {
    DCHECK_EQ(direction_, CLIENT);
    return scheduled_for_shutdown_;
  }

  // Mark the connection as scheduled to be shut down. Reactor does not dispatch
  // new calls on such a connection.
  void set_scheduled_for_shutdown() {
    DCHECK_EQ(direction_, CLIENT);
    scheduled_for_shutdown_ = true;
  }

  size_t num_queued_outbound_transfers() const {
    return outbound_transfers_.size();
  }

 private:
  friend struct CallAwaitingResponse;
  friend class QueueTransferTask;
  friend struct CallTransferCallbacks;
  friend struct ResponseTransferCallbacks;

  // A call which has been fully sent to the server, which we're waiting for
  // the server to process. This is used on the client side only.
  struct CallAwaitingResponse {
    ~CallAwaitingResponse();

    // Notification from libev that the call has timed out.
    void HandleTimeout(ev::timer &watcher, int revents);

    Connection *conn;
    std::shared_ptr<OutboundCall> call;
    ev::timer timeout_timer;

    // We time out RPC calls in two stages. This is set to the amount of timeout
    // remaining after the next timeout fires. See Connection::QueueOutboundCall().
    double remaining_timeout;
  };

  typedef std::unordered_map<uint64_t, CallAwaitingResponse*> car_map_t;
  typedef std::unordered_map<uint64_t, InboundCall*> inbound_call_map_t;

  // Returns the next valid (positive) sequential call ID by incrementing a counter
  // and ensuring we roll over from INT32_MAX to 0.
  // Negative numbers are reserved for special purposes.
  int32_t GetNextCallId() {
    static constexpr uint32_t kCallIdMask = std::numeric_limits<int32_t>::max(); // 0x7fffffff
    return static_cast<int32_t>(++call_id_ & kCallIdMask);
  }

  // An incoming packet has completed transferring on the server side.
  // This parses the call and delivers it into the call queue.
  void HandleIncomingCall(std::unique_ptr<InboundTransfer> transfer);

  // An incoming packet has completed on the client side. This parses the
  // call response, looks up the CallAwaitingResponse, and calls the
  // client callback.
  void HandleCallResponse(std::unique_ptr<InboundTransfer> transfer);

  // The given CallAwaitingResponse has elapsed its user-defined timeout.
  // Set it to Failed.
  void HandleOutboundCallTimeout(CallAwaitingResponse *car);

  // Queue a transfer for sending on this connection.
  // We will take ownership of the transfer.
  // This must be called from the reactor thread.
  void QueueOutbound(std::unique_ptr<OutboundTransfer> transfer);

  // Internal test function for injecting cancellation request when 'call'
  // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
  void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);

  Status GetSocketStatsPB(SocketStatsPB* pb) const;

  Status GetTransportDetailsPB(TransportDetailsPB* pb) const;

  // The reactor thread that created this connection.
  ReactorThread* const reactor_thread_;

  // The remote address we're talking to.
  const Sockaddr remote_;

  // The socket we're communicating on.
  std::unique_ptr<Socket> socket_;

  // The ConnectionId that serves as a key into the client connection map
  // within this reactor. Only set in the case of outbound connections.
  std::optional<ConnectionId> outbound_connection_id_;

  // The authenticated remote user (if this is an inbound connection on the server).
  RemoteUser remote_user_;

  // whether we are client or server
  Direction direction_;

  // The last time we read or wrote from the socket.
  MonoTime last_activity_time_;

  // the inbound transfer, if any
  std::unique_ptr<InboundTransfer> inbound_;

  // notifies us when our socket is writable.
  ev::io write_io_;

  // notifies us when our socket is readable.
  ev::io read_io_;

  // Set to true when the connection is registered on a loop.
  // This is used for a sanity check in the destructor that we are properly
  // un-registered before shutting down.
  bool is_epoll_registered_;

  // waiting to be sent
  boost::intrusive::list<OutboundTransfer> outbound_transfers_; // NOLINT(*)

  // Calls which have been sent and are now waiting for a response.
  car_map_t awaiting_response_;

  // Calls which have been received on the server and are currently
  // being handled.
  inbound_call_map_t calls_being_handled_;

  // The internal counter to generate next call ID to use. The call ID is
  // a signed integer: 31 LSBs of the internal counter are used to represent it.
  uint32_t call_id_;

  // Starts as Status::OK, gets set to a shutdown status upon Shutdown().
  Status shutdown_status_;

  // RPC features supported by the remote end of the connection.
  std::set<RpcFeatureFlag> remote_features_;

  // Pool from which CallAwaitingResponse objects are allocated.
  // Also a funny name.
  ObjectPool<CallAwaitingResponse> car_pool_;
  typedef ObjectPool<CallAwaitingResponse>::scoped_ptr scoped_car;

  // The credentials policy to use for connection negotiation. It defines which
  // type of user credentials used to negotiate a connection. The actual type of
  // credentials used for authentication during the negotiation process depends
  // on the credentials availability, but the result credentials guaranteed to
  // always satisfy the specified credentials policy. In other words, the actual
  // type of credentials used for connection negotiation might effectively make
  // the connection to satisfy a stronger/narrower policy.
  //
  // An example:
  //   The credentials policy for the connection was set to ANY_CREDENTIALS,
  //   but since no secondary credentials (such authn token) were available
  //   at the time of negotiation, the primary credentials were used,making the
  //   connection satisfying the PRIMARY_CREDENTIALS policy de facto.
  const CredentialsPolicy credentials_policy_;

  // Whether we completed connection negotiation.
  bool negotiation_complete_;

  // Whether it's OK to pass confidential information over the connection.
  // For example, an encrypted (but not necessarily authenticated) connection
  // is considered confidential.
  bool is_confidential_;

  // Whether the connection is scheduled for shutdown.
  bool scheduled_for_shutdown_;
};

} // namespace rpc
} // namespace kudu
