// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <typeinfo>

#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/unordered_map.hpp>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransportException.h>

#include "common/logging.h"
#include "common/status.h"
#include "gen-cpp/ErrorCodes_types.h"
#include "gen-cpp/Types_types.h"
#include "gutil/strings/substitute.h"
#include "rpc/thrift-client.h"
#include "rpc/thrift-util.h"
#include "util/container-util.h"
#include "util/metrics-fwd.h"
#include "util/network-util.h"
#include "util/time.h"

namespace impala {

class MetricGroup;
template <class T> class ClientCache;

/// Opaque pointer type which allows users of ClientCache to refer to particular client
/// instances without requiring that we parameterise ClientCacheHelper by type.
typedef void* ClientKey;

/// Helper class which implements the majority of the caching functionality without using
/// templates (i.e. pointers to the superclass of all ThriftClients and a void* for the
/// key). This class is for internal use only; the public interface is in ClientCache
/// below.
//
/// A client is either 'in-use' (the user of the cache is between GetClient() and
/// ReleaseClient() pairs) or 'cached', in which case it is available for the next
/// GetClient() call. Internally, this class maintains a map of all clients, in use or not,
/// which is indexed by their ClientKey (see below), and a map from server address to a
/// list of the keys of all clients that are not currently in use.
//
/// The user of this class only sees RPC proxy classes, but we have to track the
/// ThriftClient to manipulate the underlying transport. To do this, we use an opaque
/// ClientKey pointer type to act as the key for a particular client. We actually know the
/// type of the value at the end of pointer (it's the type parameter to ClientCache), but
/// we deliberately avoid using it so that we don't have to parameterise this class by
/// type, and thus this entire class doesn't get inlined every time it gets used.
//
/// This class is thread-safe.
//
/// TODO: shut down clients in the background if they don't get used for a period of time
/// TODO: More graceful handling of clients that have failed (maybe better
/// handled by a smart-wrapper of the interface object).
/// TODO: limits on total number of clients, and clients per-backend
/// TODO: move this to a separate header file, so that the public interface is more
/// prominent in this file
class ClientCacheHelper {
 public:
  /// Callback method which produces a client object when one cannot be found in the
  /// cache. Supplied by the ClientCache wrapper.
  typedef boost::function<ThriftClientImpl* (const TNetworkAddress& address,
                                             ClientKey* client_key)> ClientFactory;

  /// Returns a client for the given address in 'client_key'. If a previously created
  /// client is not available (i.e. there are no entries in the per-host cache), a new
  /// client is created by calling the supplied 'factory_method'. As a postcondition, the
  /// returned client will not be present in the per-host cache.
  //
  /// If there is an error creating the new client, *client_key will be NULL.
  Status GetClient(const TNetworkAddress& address, ClientFactory factory_method,
      ClientKey* client_key) WARN_UNUSED_RESULT;

  /// Returns a newly-opened client in client_key. May reopen the existing client, or may
  /// replace it with a new one (created using 'factory_method').
  //
  /// Returns an error status and sets 'client_key' to NULL if a new client cannot
  /// created.
  Status ReopenClient(
      ClientFactory factory_method, ClientKey* client_key) WARN_UNUSED_RESULT;

  /// Returns a client to the cache. Upon return, *client_key will be NULL, and the
  /// associated client will be available in the per-host cache.
  void ReleaseClient(ClientKey* client_key);

  /// Close all connections to a host (e.g., in case of failure) so that on their
  /// next use they will have to be reopened via ReopenClient().
  void CloseConnections(const TNetworkAddress& address);

  /// Close the client connection and don't put client back to per-host cache.
  /// Also remove client from client_map_.
  void DestroyClient(ClientKey* client_key);

  /// Return a debug representation of the contents of this cache.
  std::string DebugString();

  /// Closes every connection in the cache. Used only for testing.
  void TestShutdown();

  /// Creates two metrics for this cache measuring the number of clients currently used,
  /// and the total number in the cache.
  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix);

 private:
  template <class T> friend class ClientCache;
  /// Private constructor so that only ClientCache can instantiate this class.
  ClientCacheHelper(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms,
      int32_t recv_timeout_ms)
      : num_tries_(num_tries),
        wait_ms_(wait_ms),
        send_timeout_ms_(send_timeout_ms),
        recv_timeout_ms_(recv_timeout_ms),
        metrics_enabled_(false) { }

  /// There are three lock categories - the cache-wide lock (cache_lock_), the locks for a
  /// specific cache (PerHostCache::lock) and the lock for the set of all clients
  /// (client_map_lock_). They do not have to be taken concurrently (and should not be, in
  /// general), but if they are they must be taken in this order:
  /// cache_lock_->PerHostCache::lock->client_map_lock_.

  /// A PerHostCache is a list of available client keys for a single host, plus a lock that
  /// protects that list. Only one PerHostCache will ever be created for a given host, so
  /// when a PerHostCache is retrieved from the PerHostCacheMap containing it there is no
  /// need to hold on to the container's lock.
  //
  /// Only clients that are not currently in use are tracked in their host's
  /// PerHostCache. When a client is returned to the cache via ReleaseClient(), it is
  /// reinserted into its corresponding PerHostCache list. Clients returned by GetClient()
  /// are considered to be immediately in use, and so don't exist in their PerHostCache
  /// until they are released for the first time.
  struct PerHostCache {
    /// Protects clients.
    std::mutex lock;

    /// List of client keys for this entry's host.
    std::list<ClientKey> clients;
  };

  /// Protects per_host_caches_
  std::mutex cache_lock_;

  /// Map from an address to a PerHostCache containing a list of keys that have entries in
  /// client_map_ for that host. The value type is wrapped in a shared_ptr so that the
  /// copy c'tor for PerHostCache is not required.
  typedef boost::unordered_map<
      TNetworkAddress, std::shared_ptr<PerHostCache>> PerHostCacheMap;
  PerHostCacheMap per_host_caches_;

  /// Protects client_map_.
  std::mutex client_map_lock_;

  /// Map from client key back to its associated ThriftClientImpl transport. This is where
  /// all the clients are actually stored, and client instances are owned by this class
  /// and persist for exactly as long as they are present in this map.
  /// We use a map (vs. unordered_map) so we get iterator consistency across operations.
  typedef std::map<ClientKey, std::shared_ptr<ThriftClientImpl>> ClientMap;
  ClientMap client_map_;

  /// Number of attempts to make to open a connection. 0 means retry indefinitely.
  const uint32_t num_tries_;

  /// Time to wait between failed connection attempts.
  const uint64_t wait_ms_;

  /// Time to wait for the underlying socket to send data, e.g., for an RPC.
  const int32_t send_timeout_ms_;

  /// Time to wait for the underlying socket to receive data, e.g., for an RPC response.
  const int32_t recv_timeout_ms_;

  /// True if metrics have been registered (i.e. InitMetrics() was called)), and *_metric_
  /// are valid pointers.
  bool metrics_enabled_;

  /// Number of clients 'checked-out' from the cache
  IntGauge* clients_in_use_metric_;

  /// Total clients in the cache, including those in use
  IntGauge* total_clients_metric_;

  /// Create a new client for specific address in 'client' and put it in client_map_
  Status CreateClient(const TNetworkAddress& address, ClientFactory factory_method,
      ClientKey* client_key) WARN_UNUSED_RESULT;
};

/// A scoped client connection to help manage clients from a client cache. Clients of this
/// class should use DoRpc() to actually make RPC calls.
template<class T>
class ClientConnection {
 public:
  ClientConnection(ClientCache<T>* client_cache, TNetworkAddress address, Status* status)
    : client_cache_(client_cache), client_(NULL), address_(address),
      client_is_unrecoverable_(false) {
    // TODO: Inject fault here to exercise IMPALA-5576.
    *status = client_cache_->GetClient(address, &client_);
    if (status->ok()) DCHECK(client_ != NULL);
  }

  ~ClientConnection() {
    if (client_ != NULL) {
      if (client_is_unrecoverable_) {
        client_cache_->DestroyClient(&client_);
      } else {
        client_cache_->ReleaseClient(&client_);
      }
    }
  }

  Status Reopen() WARN_UNUSED_RESULT { return client_cache_->ReopenClient(&client_); }

  T* operator->() const { return client_; }

  /// Perform an RPC call f(request, response), with some failure handling in case the
  /// TCP connection underpinning this client has been closed unexpectedly. Note that
  /// this can lead to f() being called twice, as this method may retry f() once,
  /// depending on the error received from the first attempt.
  ///
  /// Returns RPC_RECV_TIMEOUT if a timeout occurred while waiting for a response,
  /// RPC_CLIENT_CONNECT_FAILURE if the client failed to connect, and RPC_GENERAL_ERROR
  /// if the RPC could not be completed for any other reason (except for an unexpectedly
  /// closed cnxn).
  /// Application-level failures should be signalled through the response type.
  template <class F, class Request, class Response>
  Status DoRpc(const F& f, const Request& request, Response* response) {
    DCHECK(response != nullptr);
    client_is_unrecoverable_ = true;
    bool send_done = false;
    try {
      (client_->*f)(*response, request, &send_done);
    } catch (const apache::thrift::transport::TTransportException& e) {
      if (send_done && IsReadTimeoutTException(e)) {
        return RecvTimeoutStatus(typeid(*response).name());
      }

      // Client may have unexpectedly been closed, so re-open and retry once if we didn't
      // successfully send the payload yet or if the exception indicates the connection
      // was closed on the other end. Note that TCP can have a half-open connection so
      // send() may still succeed even after the other end already closed the socket.
      // The payload can just be buffered in the kernel.
      if (!send_done || IsConnResetTException(e)) {
        return RetryRpc(f, request, response);
      }

      // Payload was sent and failure wasn't a timeout waiting for response. Fail the RPC.
      return GeneralErrorStatus(e, typeid(*response).name(), send_done);
    } catch (const apache::thrift::TException& e) {
      return GeneralErrorStatus(e, typeid(*response).name(), send_done);
    }
    client_is_unrecoverable_ = false;
    return Status::OK();
  }

  /// Return struct for DoRpcWithRetry() that allows callers to distinguish between
  /// failures in getting a client and failures sending the RPC.
  struct RpcStatus {
    Status status;

    // Set to true if 'status' is not OK and the error occurred while getting the client.
    bool client_error;

    static RpcStatus OK() { return {Status::OK(), false}; }
  };

  /// Helper that retries constructing a client and calling DoRpc() up to 'retries' times
  /// with 'delay_ms' delay between retries. This handles both RPC failures and failures
  /// to get a client from 'client_cache'.  'debug_fn' is a Status-returning function that
  /// can be used to inject errors into the RPC.
  template <class F, class DebugF, class Request, class Response>
  static RpcStatus DoRpcWithRetry(ClientCache<T>* client_cache, TNetworkAddress address,
      const F& f, const Request& request, int retries, int64_t delay_ms,
      const DebugF& debug_fn, Response* response) {
    Status rpc_status;
    Status client_status;

    // Try to send the RPC as many times as requested before failing.
    for (int i = 0; i < retries; ++i) {
      if (i > 0) SleepForMs(delay_ms); // Delay before retrying.
      ClientConnection<T> client(client_cache, address, &client_status);
      if (!client_status.ok()) continue;

      rpc_status = debug_fn();
      if (!rpc_status.ok()) {
        LOG(INFO) << "Injected RPC error to " << TNetworkAddressToString(address) << ": "
                  << rpc_status.GetDetail();
        continue;
      }

      rpc_status = client.DoRpc(f, request, response);
      if (rpc_status.ok()) break;
      LOG(INFO) << "RPC to " << TNetworkAddressToString(address) << " failed "
                << rpc_status.GetDetail();
    }
    if (!client_status.ok()) return {client_status, true};
    if (!rpc_status.ok()) return {rpc_status, false};
    return RpcStatus::OK();
  }

  /// In certain cases, the server may take longer to provide an RPC response than
  /// the configured socket timeout. Callers may wish to retry receiving the response.
  /// This is safe if and only if DoRpc() returned RPC_RECV_TIMEOUT.
  template <class F, class Response>
  Status RetryRpcRecv(const F& recv_func, Response* response) {
    DCHECK(response != NULL);
    DCHECK(client_is_unrecoverable_);
    try {
      (client_->*recv_func)(*response);
    } catch (const apache::thrift::transport::TTransportException& e) {
      if (IsReadTimeoutTException(e)) {
        return RecvTimeoutStatus(typeid(*response).name());
      }
      // If it's not timeout exception, then the connection is broken, stop retrying.
      return GeneralErrorStatus(e, typeid(*response).name(), true);
    } catch (const apache::thrift::TException& e) {
      return GeneralErrorStatus(e, typeid(*response).name(), true);
    }
    client_is_unrecoverable_ = false;
    return Status::OK();
  }

 private:
  ClientCache<T>* client_cache_;
  T* client_;
  TNetworkAddress address_;

  /// Indicate the last rpc call sent by this connection succeeds or not. If the rpc call
  /// fails for any reason, the connection could be left in a bad state and cannot be
  /// recovered.
  bool client_is_unrecoverable_;

  /// Returns an error Status for general RPC errors not due to recv timeout or
  /// stale connections. Will also log some details about the error.
  Status GeneralErrorStatus(
      const apache::thrift::TException& e, const std::string& rpc_name, bool send_done) {
    ErrorMsg msg(TErrorCode::RPC_GENERAL_ERROR, strings::Substitute("Client for $0 hit "
        "an unexpected exception: $1, type: $2, rpc: $3, send: $4",
        TNetworkAddressToString(address_), e.what(), typeid(e).name(), rpc_name,
        send_done ? "done" : "not done"));
    VLOG_QUERY << msg.msg();
    return Status::Expected(msg);
  }

  /// Returns an error Status for RPC errors due to recv timeout.
  /// Will also log some details about the error.
  Status RecvTimeoutStatus(const std::string& rpc_name) {
    ErrorMsg msg(TErrorCode::RPC_RECV_TIMEOUT, TNetworkAddressToString(address_),
        rpc_name);
    VLOG_QUERY << msg.msg();
    return Status::Expected(msg);
  }

  /// Retry the RPC if TCP connection underpinning this client has been closed
  /// unexpectedly. Called only when we didn't succeed in sending all the payload
  /// in the first invocation of RPC call. Returns RPC_CLIENT_CONNECT_FAILURE
  /// on connection failure or RPC_GENERAL_ERROR for all other RPC failures.
  template <class F, class Request, class Response>
  Status RetryRpc(const F& f, const Request& request, Response* response) {
    DCHECK(client_is_unrecoverable_);
    // Client may have unexpectedly been closed, so re-open and retry.
    // TODO: ThriftClient should return proper error codes.
    Status status = Reopen();
    if (!status.ok()) {
      ErrorMsg msg(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, status.GetDetail());
      VLOG_QUERY << msg.msg() << " rpc: " << typeid(*response).name();
      return Status::Expected(msg);
    }
    bool send_done = false;
    try {
      (client_->*f)(*response, request, &send_done);
    } catch (const apache::thrift::TException& e) {
      // By this point the RPC really has failed.
      // TODO: Revisit this logic later. It's possible that the new connection
      // works but we hit timeout here.
      return GeneralErrorStatus(e, typeid(*response).name(), send_done);
    }
    client_is_unrecoverable_ = false;
    return Status::OK();
  }
};

/// Generic cache of Thrift clients for a given service type.
/// This class is thread-safe.
template<class T>
class ClientCache {
 public:
  typedef ThriftClient<T> Client;

  ClientCache(const std::string& service_name = "", bool enable_ssl = false)
      : client_cache_helper_(1, 0, 0, 0) {
    client_factory_ = boost::bind<ThriftClientImpl*>(
        boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name, enable_ssl);
  }

  /// Create a ClientCache where connections are tried num_tries times, with a pause of
  /// wait_ms between attempts. The underlying TSocket's send and receive timeouts of
  /// each connection can also be set. If num_tries == 0, retry connections indefinitely.
  /// A send/receive timeout of 0 means there is no timeout.
  ClientCache(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms = 0,
      int32_t recv_timeout_ms = 0, const std::string& service_name = "",
      bool enable_ssl = false)
      : client_cache_helper_(num_tries, wait_ms, send_timeout_ms, recv_timeout_ms) {
    client_factory_ = boost::bind<ThriftClientImpl*>(
        boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name, enable_ssl);
  }

  /// Close all clients connected to the supplied address, (e.g., in
  /// case of failure) so that on their next use they will have to be
  /// Reopen'ed.
  void CloseConnections(const TNetworkAddress& address) {
    return client_cache_helper_.CloseConnections(address);
  }

  /// Helper method which returns a debug string
  std::string DebugString() {
    return client_cache_helper_.DebugString();
  }

  /// For testing only: shutdown all clients
  void TestShutdown() {
    return client_cache_helper_.TestShutdown();
  }

  /// Adds metrics for this cache to the supplied Metrics instance. The
  /// metrics have keys that are prefixed by the key_prefix argument
  /// (which should not end in a period).
  /// Must be called before the cache is used, otherwise the metrics might be wrong
  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix) {
    client_cache_helper_.InitMetrics(metrics, key_prefix);
  }

 private:
  friend class ClientConnection<T>;

  /// Most operations in this class are thin wrappers around the
  /// equivalent in ClientCacheHelper, which is a non-templated cache
  /// to avoid inlining lots of code wherever this cache is used.
  ClientCacheHelper client_cache_helper_;

  /// Function pointer, bound to MakeClient, which produces clients when the cache is empty
  ClientCacheHelper::ClientFactory client_factory_;

  /// Obtains a pointer to a Thrift interface object (of type T),
  /// backed by a live transport which is already open. Returns
  /// Status::OK unless there was an error opening the transport.
  Status GetClient(const TNetworkAddress& address, T** iface) WARN_UNUSED_RESULT {
    return client_cache_helper_.GetClient(
        address, client_factory_, reinterpret_cast<ClientKey*>(iface));
  }

  /// Close and delete the underlying transport. Return a new client connecting to the
  /// same host/port.
  /// Returns an error status if a new connection cannot be established and *client will
  /// be unaffected in that case.
  Status ReopenClient(T** client) WARN_UNUSED_RESULT {
    return client_cache_helper_.ReopenClient(
        client_factory_, reinterpret_cast<ClientKey*>(client));
  }

  /// Return the client to the cache and set *client to NULL.
  void ReleaseClient(T** client) {
    return client_cache_helper_.ReleaseClient(reinterpret_cast<ClientKey*>(client));
  }

  /// Destroy the client because it's left in an unrecoverable state after errors
  /// in DoRpc() to avoid more rpc failure.
  void DestroyClient(T** client) {
    return client_cache_helper_.DestroyClient(reinterpret_cast<ClientKey*>(client));
  }

  /// Factory method to produce a new ThriftClient<T> for the wrapped cache
  ThriftClientImpl* MakeClient(const TNetworkAddress& address, ClientKey* client_key,
      const std::string service_name, bool enable_ssl) {
    Client* client = new Client(address.hostname, address.port, service_name, NULL,
        enable_ssl);
    *client_key = reinterpret_cast<ClientKey>(client->iface());
    return client;
  }

};

}
