/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#ifndef GEODE_TCRCONNECTION_H_
#define GEODE_TCRCONNECTION_H_

#include <atomic>
#include <chrono>

#include <ace/Semaphore.h>

#include <geode/CacheableBuiltins.hpp>
#include <geode/ExceptionTypes.hpp>
#include <geode/internal/geode_globals.hpp>

#include "Connector.hpp"
#include "DiffieHellman.hpp"
#include "TcrMessage.hpp"
#include "util/synchronized_set.hpp"

#define DEFAULT_TIMEOUT_RETRIES 12
#define PRIMARY_SERVER_TO_CLIENT 101
#define SECONDARY_SERVER_TO_CLIENT 102
#define SUCCESSFUL_SERVER_TO_CLIENT 105
#define UNSUCCESSFUL_SERVER_TO_CLIENT 106
#define CLIENT_TO_SERVER 100
#define REPLY_OK 59
#define REPLY_OK_CS43 58
#define REPLY_REFUSED 60
#define REPLY_INVALID 61
#define REPLY_SSL_ENABLED 21
#define REPLY_AUTHENTICATION_REQUIRED 62
#define REPLY_AUTHENTICATION_FAILED 63
#define REPLY_DUPLICATE_DURABLE_CLIENT 64

#define SECURITY_CREDENTIALS_NONE 0
#define SECURITY_CREDENTIALS_NORMAL 1
#define SECURITY_CREDENTIALS_DHENCRYPT 2
#define SECURITY_MULTIUSER_NOTIFICATIONCHANNEL 3

/** Closes and Deletes connection only if it exists */
#define GF_SAFE_DELETE_CON(x) \
  {                           \
    x->close();               \
    delete x;                 \
    x = nullptr;              \
  }

namespace apache {
namespace geode {
namespace client {

struct chunkHeader {
  int32_t chunkLength;
  int8_t flags;
};

struct chunkedResponseHeader {
  int32_t messageType;
  int32_t numberOfParts;
  int32_t transactionId;
  chunkHeader header;
};

enum ConnErrType {
  CONN_NOERR = 0x0,
  CONN_NODATA = 0x1,
  CONN_TIMEOUT = 0x3,
  CONN_IOERR = 0x4,
  CONN_OTHERERR = 0x8
};

enum ServerQueueStatus {
  NON_REDUNDANT_SERVER = 0,
  REDUNDANT_SERVER = 1,
  PRIMARY_SERVER = 2
};

class TcrEndpoint;
class SystemProperties;
class ThinClientPoolDM;
class TcrConnectionManager;
class APACHE_GEODE_EXPORT TcrConnection {
 public:
  using clock = std::chrono::steady_clock;
  using time_point = clock::time_point;

  /** Create one connection, endpoint is in format of hostname:portno
   * It will do handshake with j-server. There're 2 types of handshakes:
   * 1) handshake for request
   *    send following bytes:
   *    CLIENT_TO_SERVER
   *    REPLY_OK
   *    2 bytes for the length of idenfifier
   *    a string with "hostname:processId" as identifier
   *
   *    if send succeeds, handshake succeeds. Otherwise, construction
   *    fails.
   *
   * 2) handshake for client notification
   *    send following bytes:
   *    SERVER_TO_CLIENT
   *    1 (4 bytes, we can hard-code)
   *    12345 (4 bytes, we can hard-code)
   *
   *    So the total bytes to send are 9
   *    read one byte from server, it should be CLIENT_TO_SERVER
   *    Otherwise, construction fails.
   * @param     ports     List of local ports for connections to endpoint
   * @param     numPorts  Size of ports list
   */
  bool InitTcrConnection(
      TcrEndpoint* endpointObj, const char* endpoint,
      synchronized_set<std::unordered_set<uint16_t>>& ports,
      bool isClientNotification = false, bool isSecondary = false,
      std::chrono::microseconds connectTimeout = DEFAULT_CONNECT_TIMEOUT);

  TcrConnection(const TcrConnectionManager& connectionManager,
                volatile const bool& isConnected)
      : connectionId(0),
        m_connectionManager(&connectionManager),
        m_dh(nullptr),
        m_endpoint(nullptr),
        m_endpointObj(nullptr),
        m_connected(isConnected),
        m_conn(nullptr),
        m_hasServerQueue(NON_REDUNDANT_SERVER),
        m_queueSize(0),
        m_port(0),
        m_chunksProcessSema(0),
        m_isBeingUsed(false),
        m_isUsed(0),
        m_poolDM(nullptr) {}

  /* destroy the connection */
  ~TcrConnection();

  /**
   * send a synchronized request to server.
   *
   * It will send the buffer, then wait to receive 17 bytes and save in
   * msg_header.
   * msg_header[0] is message type.
   * msg_header[1],msg_header[2],msg_header[3],msg_header[4] will be a 4 bytes
   * integer,
   * let's say, msgLen, which specifies the length of next read. byteReads some
   * number of
   * call read again for msgLen bytes, and save the bytes into msg_body.
   * concatenate the msg_header and msg_body into buffer, msg. The msg should be
   * a '0' ended
   * string. i.e. If the msg_header plus msg_body has 100 chars, msg should be a
   * 101 char array
   * to contain the '0' in the end. We need it to get length of the msg.
   * Return the msg.
   *
   * @param      buffer the buffer to send
   * @param      len length of the data to send
   * @param      sendTimeoutSec write timeout in sec
   * @param      recvLen output parameter for length of the received message
   * @param      receiveTimeoutSec read timeout in sec
   * @return     byte arrary of response. '0' ended.
   * @exception  GeodeIOException  if an I/O error occurs (socket failure).
   * @exception  TimeoutException  if timeout happens at any of the 3 socket
   * operation: 1 write, 2 read
   */
  char* sendRequest(
      const char* buffer, size_t len, size_t* recvLen,
      std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
      std::chrono::microseconds receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS,
      int32_t request = -1);

  /**
   * send a synchronized request to server for REGISTER_INTEREST_LIST.
   *
   * @param      buffer the buffer to send
   *             len length of the data to send
   *             message vector, which will return chunked TcrMessage.
   *             sendTimeoutSec write timeout in sec
   *             receiveTimeoutSec read timeout in sec
   * @exception  GeodeIOException  if an I/O error occurs (socket failure).
   * @exception  TimeoutException  if timeout happens at any of the 3 socket
   * operation: 1 write, 2 read
   */
  void sendRequestForChunkedResponse(
      const TcrMessage& request, size_t len, TcrMessageReply& message,
      std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
      std::chrono::microseconds receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS);

  /**
   * send an asynchronized request to server. No response is expected.
   * we need to use it to send CLOSE_CONNECTION msg
   *
   * @param      buffer the buffer to send
   *             len length of the data to send
   *             sendTimeoutSec write timeout in sec
   * @return     no return. Because it either succeeds, or throw exception.
   * @exception  GeodeIOException  if an I/O error occurs (socket failure).
   * @exception  TimeoutException  if timeout happens at any of the 3 socket
   * operation: 1 write, 2 read
   */
  void send(const char* buffer, size_t len,
            std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
            bool checkConnected = true);

  void send(std::chrono::microseconds& timeSpent, const char* buffer,
            size_t len,
            std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
            bool checkConnected = true);

  /**
   * This method is for receiving client notification. It will read 2 times as
   * reading reply in sendRequest()
   *
   * @param      recvLen output parameter for length of the received message
   * @param      receiveTimeoutSec read timeout in sec
   * @return     byte arrary of response. '0' ended.
   * @exception  GeodeIOException  if an I/O error occurs (socket failure).
   * @exception  TimeoutException  if timeout happens at any of the 3 socket
   * operation: 1 write, 2 read
   */
  char* receive(
      size_t* recvLen, ConnErrType* opErr,
      std::chrono::microseconds receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS);

  //  readMessage is now public
  /**
   * This method reads a message from the socket connection and returns the byte
   * array of response.
   * @param      recvLen output parameter for length of the received message
   * @param      receiveTimeoutSec read timeout in seconds
   * @param      doHeaderTimeoutRetries retry when header receive times out
   * @return     byte array of response. '0' ended.
   * @exception  GeodeIOException  if an I/O error occurs (socket failure).
   * @exception  TimeoutException  if timeout happens during read
   */
  char* readMessage(size_t* recvLen,
                    std::chrono::microseconds receiveTimeoutSec,
                    bool doHeaderTimeoutRetries, ConnErrType* opErr,
                    bool isNotificationMessage = false, int32_t request = -1);

  /**
   * This method reads an interest list response  message from the socket
   * connection and sets the reply message
   * parameter.
   * @param      reply response message
   * @param      receiveTimeout read timeout
   * @param      doHeaderTimeoutRetries retry when header receive times out
   * @exception  GeodeIOException  if an I/O error occurs (socket failure).
   * @exception  TimeoutException  if timeout happens during read
   */
  void readMessageChunked(TcrMessageReply& reply,
                          std::chrono::microseconds receiveTimeout,
                          bool doHeaderTimeoutRetries);

  /**
   * Send close connection message to the server.
   */
  void close();

  //  Durable clients: return true if server has HA queue.
  ServerQueueStatus inline getServerQueueStatus(int32_t& queueSize) {
    queueSize = m_queueSize;
    return m_hasServerQueue;
  }

  uint16_t inline getPort() { return m_port; }

  TcrEndpoint* getEndpointObject() const { return m_endpointObj; }
  bool isBeingUsed() { return m_isBeingUsed; }
  bool setAndGetBeingUsed(
      volatile bool isBeingUsed,
      bool forTransaction);  // { m_isBeingUsed = isBeingUsed ;}

  // helpers for pool connection manager
  void touch();
  bool hasExpired(const std::chrono::milliseconds& expiryTime);
  bool isIdle(const std::chrono::milliseconds& idleTime);
  time_point getLastAccessed();
  void updateCreationTime();

  int64_t getConnectionId() {
    LOGDEBUG("TcrConnection::getConnectionId() = %d ", connectionId);
    return connectionId;
  }

  void setConnectionId(int64_t id) {
    LOGDEBUG("Tcrconnection:setConnectionId() = %d ", id);
    connectionId = id;
  }

  const TcrConnectionManager& getConnectionManager() {
    return *m_connectionManager;
  }

  std::shared_ptr<CacheableBytes> encryptBytes(
      std::shared_ptr<CacheableBytes> data) {
    if (m_dh != nullptr) {
      return m_dh->encrypt(data);
    } else {
      return data;
    }
  }

  std::shared_ptr<CacheableBytes> decryptBytes(
      std::shared_ptr<CacheableBytes> data) {
    if (m_dh != nullptr) {
      return m_dh->decrypt(data);
    } else {
      return data;
    }
  }

 private:
  int64_t connectionId;
  const TcrConnectionManager* m_connectionManager;
  DiffieHellman* m_dh;

  std::chrono::microseconds calculateHeaderTimeout(
      std::chrono::microseconds receiveTimeout, bool retry);

  chunkedResponseHeader readResponseHeader(std::chrono::microseconds timeout);

  chunkHeader readChunkHeader(std::chrono::microseconds timeout);

  std::vector<uint8_t> readChunkBody(std::chrono::microseconds timeout,
                                     int32_t chunkLength);

  bool processChunk(TcrMessageReply& reply, std::chrono::microseconds timeout,
                    int32_t chunkLength, int8_t lastChunkAndSecurityFlags);

  /**
   * To read Intantiator message(which meant for java client), here we are
   * ignoring it
   */
  void readHandshakeInstantiatorMsg(std::chrono::microseconds connectTimeout);

  /**
   * Packs the override settings bits into bytes - currently a single byte for
   * conflation, remove-unresponsive-client and notify-by-subscription.
   */
  uint8_t getOverrides(const SystemProperties* props);

  /**
   * To read the from stream
   */
  int32_t readHandShakeInt(std::chrono::microseconds connectTimeout);

  /*
   * To read the arraysize
   */
  int32_t readHandshakeArraySize(std::chrono::microseconds connectTimeout);

  /*
   * This function reads "numberOfBytes" and ignores it.
   */
  void readHandShakeBytes(int numberOfBytes,
                          std::chrono::microseconds connectTimeout);

  /** Create a normal or SSL connection */
  Connector* createConnection(
      const char* ipaddr,
      std::chrono::microseconds waitSeconds = DEFAULT_CONNECT_TIMEOUT,
      int32_t maxBuffSizePool = 0);

  /**
   * Reads bytes from socket and handles error conditions in case of Handshake.
   */
  std::vector<int8_t> readHandshakeData(
      int32_t msgLength, std::chrono::microseconds connectTimeout);

  /**
   * Reads raw bytes (without appending nullptr terminator) from socket and
   * handles error conditions in case of Handshake.
   */
  std::shared_ptr<CacheableBytes> readHandshakeRawData(
      int32_t msgLength, std::chrono::microseconds connectTimeout);
  /**
   * Reads a string from socket and handles error conditions in case of
   * Handshake.
   */
  std::shared_ptr<CacheableString> readHandshakeString(
      std::chrono::microseconds connectTimeout);

  /**
   * Reads a byte array (using initial length) from socket and handles error
   * conditions in case of Handshake.
   */
  std::shared_ptr<CacheableBytes> readHandshakeByteArray(
      std::chrono::microseconds connectTimeout);

  /**
   * Send data to the connection till sendTimeout
   */
  ConnErrType sendData(const char* buffer, size_t length,
                       std::chrono::microseconds sendTimeout,
                       bool checkConnected = true);

  ConnErrType sendData(std::chrono::microseconds& timeSpent, const char* buffer,
                       size_t length, std::chrono::microseconds sendTimeout,
                       bool checkConnected = true);

  /**
   * Read data from the connection till receiveTimeoutSec
   */
  ConnErrType receiveData(char* buffer, size_t length,
                          std::chrono::microseconds receiveTimeoutSec,
                          bool checkConnected = true,
                          bool isNotificationMessage = false);

  const char* m_endpoint;
  TcrEndpoint* m_endpointObj;
  volatile const bool& m_connected;
  Connector* m_conn;
  ServerQueueStatus m_hasServerQueue;
  int32_t m_queueSize;
  uint16_t m_port;

  // semaphore to synchronize with the chunked response processing thread
  ACE_Semaphore m_chunksProcessSema;

  time_point m_creationTime;
  time_point m_lastAccessed;

  // Disallow copy constructor and assignment operator.
  TcrConnection(const TcrConnection&);
  TcrConnection& operator=(const TcrConnection&);
  volatile bool m_isBeingUsed;
  std::atomic<uint32_t> m_isUsed;
  ThinClientPoolDM* m_poolDM;
  bool useReplyTimeout(const TcrMessage& request) const;
  std::chrono::microseconds sendWithTimeouts(
      const char* data, size_t len, std::chrono::microseconds sendTimeout,
      std::chrono::microseconds receiveTimeout);
  bool replyHasResult(const TcrMessage& request, TcrMessageReply& reply);
};
}  // namespace client
}  // namespace geode
}  // namespace apache

#endif  // GEODE_TCRCONNECTION_H_
