/**
 * Site2SiteProtocol class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <stdio.h>
#include <time.h>
#include <chrono>
#include <utility>
#include <map>
#include <string>
#include <memory>
#include <thread>
#include <random>
#include <iostream>
#include <vector>

#include "sitetosite/RawSocketProtocol.h"
#include "io/CRCStream.h"
#include "sitetosite/Peer.h"
#include "utils/gsl.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace sitetosite {

std::shared_ptr<utils::IdGenerator> RawSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator();
std::shared_ptr<utils::IdGenerator> Transaction::id_generator_ = utils::IdGenerator::getIdGenerator();

const char *RawSiteToSiteClient::HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
/**
 * Boolean value indicating whether or not the contents of a FlowFile should
 * be GZipped when transferred.
 */
"GZIP",
/**
 * The unique identifier of the port to communicate with
 */
"PORT_IDENTIFIER",
/**
 * Indicates the number of milliseconds after the request was made that the
 * client will wait for a response. If no response has been received by the
 * time this value expires, the server can move on without attempting to
 * service the request because the client will have already disconnected.
 */
"REQUEST_EXPIRATION_MILLIS",
/**
 * The preferred number of FlowFiles that the server should send to the
 * client when pulling data. This property was introduced in version 5 of
 * the protocol.
 */
"BATCH_COUNT",
/**
 * The preferred number of bytes that the server should send to the client
 * when pulling data. This property was introduced in version 5 of the
 * protocol.
 */
"BATCH_SIZE",
/**
 * The preferred amount of time that the server should send data to the
 * client when pulling data. This property was introduced in version 5 of
 * the protocol. Value is in milliseconds.
 */
"BATCH_DURATION" };

bool RawSiteToSiteClient::establish() {
  if (peer_state_ != IDLE) {
    logger_->log_error("Site2Site peer state is not idle while try to establish");
    return false;
  }

  bool ret = peer_->Open();

  if (!ret) {
    logger_->log_error("Site2Site peer socket open failed");
    return false;
  }

  // Negotiate the version
  ret = initiateResourceNegotiation();

  if (!ret) {
    logger_->log_error("Site2Site Protocol Version Negotiation failed");
    return false;
  }

  logger_->log_debug("Site2Site socket established");
  peer_state_ = ESTABLISHED;

  return true;
}

bool RawSiteToSiteClient::initiateResourceNegotiation() {
  // Negotiate the version
  if (peer_state_ != IDLE) {
    logger_->log_error("Site2Site peer state is not idle while initiateResourceNegotiation");
    return false;
  }

  logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_.to_string(), _currentVersion);

  int ret = peer_->write(getResourceName());

  logger_->log_trace("result of writing resource name is %i", ret);
  if (ret <= 0) {
    logger_->log_debug("result of writing resource name is %i", ret);
    // tearDown();
    return false;
  }

  ret = peer_->write(_currentVersion);

  if (ret <= 0) {
    logger_->log_debug("result of writing version is %i", ret);
    return false;
  }

  uint8_t statusCode;
  ret = peer_->read(statusCode);

  if (ret <= 0) {
    logger_->log_debug("result of writing version status code  %i", ret);
    return false;
  }
  logger_->log_debug("status code is %i", statusCode);
  switch (statusCode) {
    case RESOURCE_OK:
      logger_->log_debug("Site2Site Protocol Negotiate protocol version OK");
      return true;
    case DIFFERENT_RESOURCE_VERSION:
      uint32_t serverVersion;
      ret = peer_->read(serverVersion);
      if (ret <= 0) {
        return false;
      }

      logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;

      for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
        if (serverVersion >= _supportedVersion[i]) {
          _currentVersion = _supportedVersion[i];
          _currentVersionIndex = i;
          return initiateResourceNegotiation();
        }
      }
      logger_->log_error("Site2Site Negotiate protocol failed to find a common version with server");
      return false;
    case NEGOTIATED_ABORT:
      logger_->log_error("Site2Site Negotiate protocol response ABORT");
      return false;
    default:
      logger_->log_error("Negotiate protocol response unknown code %d", statusCode);
      return false;
  }
}

bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
  // Negotiate the version
  if (peer_state_ != HANDSHAKED) {
    logger_->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation");
    return false;
  }

  logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_.to_string(), _currentCodecVersion);

  int ret = peer_->write(getCodecResourceName());

  if (ret <= 0) {
    logger_->log_debug("result of getCodecResourceName is %i", ret);
    return false;
  }

  ret = peer_->write(_currentCodecVersion);

  if (ret <= 0) {
    logger_->log_debug("result of _currentCodecVersion is %i", ret);
    return false;
  }

  uint8_t statusCode;
  ret = peer_->read(statusCode);

  if (ret <= 0) {
    return false;
  }

  switch (statusCode) {
    case RESOURCE_OK:
      logger_->log_trace("Site2Site Codec Negotiate version OK");
      return true;
    case DIFFERENT_RESOURCE_VERSION:
      uint32_t serverVersion;
      ret = peer_->read(serverVersion);
      if (ret <= 0) {
        return false;
      }
      logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;

      for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
        if (serverVersion >= _supportedCodecVersion[i]) {
          _currentCodecVersion = _supportedCodecVersion[i];
          _currentCodecVersionIndex = i;
          return initiateCodecResourceNegotiation();
        }
      }
      logger_->log_error("Site2Site Negotiate codec failed to find a common version with server");
      return false;
    case NEGOTIATED_ABORT:
      logger_->log_error("Site2Site Codec Negotiate response ABORT");
      return false;
    default:
      logger_->log_error("Negotiate Codec response unknown code %d", statusCode);
      return false;
  }
}

bool RawSiteToSiteClient::handShake() {
  if (peer_state_ != ESTABLISHED) {
    logger_->log_error("Site2Site peer state is not established while handshake");
    return false;
  }
  logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string());
  _commsIdentifier = id_generator_->generate();

  int ret = peer_->write(_commsIdentifier);

  if (ret <= 0) {
    return false;
  }

  std::map<std::string, std::string> properties;
  properties[HandShakePropertyStr[GZIP]] = "false";
  properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_.to_string();
  properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeOut);
  if (_currentVersion >= 5) {
    if (_batchCount > 0)
      properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(_batchCount);
    if (_batchSize > 0)
      properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(_batchSize);
    if (_batchDuration > 0)
      properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(_batchDuration);
  }

  if (_currentVersion >= 3) {
    ret = peer_->write(peer_->getURL());
    if (ret <= 0) {
      return false;
    }
  }

  uint32_t size = gsl::narrow<uint32_t>(properties.size());
  ret = peer_->write(size);
  if (ret <= 0) {
    return false;
  }

  std::map<std::string, std::string>::iterator it;
  for (it = properties.begin(); it != properties.end(); it++) {
    ret = peer_->write(it->first);
    if (ret <= 0) {
      return false;
    }
    ret = peer_->write(it->second);
    if (ret <= 0) {
      return false;
    }
    logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second);
  }

  RespondCode code;
  std::string message;

  ret = readRespond(nullptr, code, message);

  if (ret <= 0) {
    return false;
  }

  std::string error;

  switch (code) {
    case PROPERTIES_OK:
      logger_->log_debug("Site2Site HandShake Completed");
      peer_state_ = HANDSHAKED;
      return true;
    case PORT_NOT_IN_VALID_STATE:
      error = "in invalid state";
      break;
    case UNKNOWN_PORT:
      error = "an unknown port";
      break;
    case PORTS_DESTINATION_FULL:
      error = "full";
      break;
    // Unknown error
    default:
      logger_->log_error("HandShake Failed because of unknown respond code %d", code);
      ret = -1;
      return false;
  }

  // All known error cases handled here
  logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
  ret = -1;
  return false;
}

void RawSiteToSiteClient::tearDown() {
  if (peer_state_ >= ESTABLISHED) {
    logger_->log_trace("Site2Site Protocol tearDown");
    // need to write shutdown request
    writeRequestType(SHUTDOWN);
  }

  known_transactions_.clear();
  peer_->Close();
  peer_state_ = IDLE;
}

bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
  if (establish() && handShake()) {
    int status = writeRequestType(REQUEST_PEER_LIST);

    if (status <= 0) {
      tearDown();
      return false;
    }

    uint32_t number;
    status = peer_->read(number);

    if (status <= 0) {
      tearDown();
      return false;
    }

    for (uint32_t i = 0; i < number; i++) {
      std::string host;
      status = peer_->read(host);
      if (status <= 0) {
        tearDown();
        return false;
      }
      uint32_t port;
      status = peer_->read(port);
      if (status <= 0) {
        tearDown();
        return false;
      }
      uint8_t secure;
      status = peer_->read(secure);
      if (status <= 0) {
        tearDown();
        return false;
      }
      uint32_t count;
      status = peer_->read(count);
      if (status <= 0) {
        tearDown();
        return false;
      }
      PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
      peers.push_back(std::move(status));
      logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
    }

    tearDown();
    return true;
  } else {
    tearDown();
    return false;
  }
}

  int RawSiteToSiteClient::writeRequestType(RequestType type) {
    if (type >= MAX_REQUEST_TYPE)
      return -1;

    return peer_->write(SiteToSiteRequest::RequestTypeStr[type]);
  }

  int RawSiteToSiteClient::readRequestType(RequestType &type) {
    std::string requestTypeStr;

    int ret = peer_->read(requestTypeStr);

    if (ret <= 0)
      return ret;

    for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
      if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
        type = (RequestType) i;
        return ret;
      }
    }

    return -1;
  }

int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) {
  return readResponse(transaction, code, message);
}

int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
  return writeResponse(transaction, code, message);
}

bool RawSiteToSiteClient::negotiateCodec() {
  if (peer_state_ != HANDSHAKED) {
    logger_->log_error("Site2Site peer state is not handshaked while negotiate codec");
    return false;
  }

  logger_->log_trace("Site2Site Protocol Negotiate Codec with destination port %s", port_id_.to_string());

  int status = writeRequestType(NEGOTIATE_FLOWFILE_CODEC);

  if (status <= 0) {
    return false;
  }

  // Negotiate the codec version
  bool ret = initiateCodecResourceNegotiation();

  if (!ret) {
    logger_->log_error("Site2Site Codec Version Negotiation failed");
    return false;
  }

  logger_->log_trace("Site2Site Codec Completed and move to READY state for data transfer");
  peer_state_ = READY;

  return true;
}

bool RawSiteToSiteClient::bootstrap() {
  if (peer_state_ == READY)
    return true;

  tearDown();

  if (establish() && handShake() && negotiateCodec()) {
    logger_->log_debug("Site to Site ready for data transaction");
    return true;
  } else {
    peer_->yield();
    tearDown();
    return false;
  }
}

std::shared_ptr<Transaction> RawSiteToSiteClient::createTransaction(TransferDirection direction) {
  int ret;
  bool dataAvailable;
  std::shared_ptr<Transaction> transaction = nullptr;

  if (peer_state_ != READY) {
    bootstrap();
  }

  if (peer_state_ != READY) {
    return transaction;
  }

  if (direction == RECEIVE) {
    ret = writeRequestType(RECEIVE_FLOWFILES);

    if (ret <= 0) {
      return transaction;
    }

    RespondCode code;
    std::string message;

    ret = readRespond(nullptr, code, message);

    if (ret <= 0) {
      return transaction;
    }

    org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(gsl::make_not_null(peer_.get()));
    switch (code) {
      case MORE_DATA:
        dataAvailable = true;
        logger_->log_trace("Site2Site peer indicates that data is available");
        transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
        known_transactions_[transaction->getUUID()] = transaction;
        transaction->setDataAvailable(dataAvailable);
        logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
        return transaction;
      case NO_MORE_DATA:
        dataAvailable = false;
        logger_->log_trace("Site2Site peer indicates that no data is available");
        transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
        known_transactions_[transaction->getUUID()] = transaction;
        transaction->setDataAvailable(dataAvailable);
        logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
        return transaction;
      default:
        logger_->log_warn("Site2Site got unexpected response %d when asking for data", code);
        return NULL;
    }
  } else {
    ret = writeRequestType(SEND_FLOWFILES);

    if (ret <= 0) {
      return NULL;
    } else {
      org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(gsl::make_not_null(peer_.get()));
      transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
      known_transactions_[transaction->getUUID()] = transaction;
      logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
      return transaction;
    }
  }
}

bool RawSiteToSiteClient::transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
                                          std::map<std::string, std::string> attributes) {
  std::shared_ptr<Transaction> transaction = NULL;

  if (payload.length() <= 0)
    return false;

  if (peer_state_ != READY) {
    if (!bootstrap()) {
      return false;
    }
  }

  if (peer_state_ != READY) {
    context->yield();
    tearDown();
    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
  }

  // Create the transaction
  transaction = createTransaction(SEND);

  if (transaction == NULL) {
    context->yield();
    tearDown();
    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
  }

  utils::Identifier transactionID = transaction->getUUID();

  try {
    DataPacket packet(getLogger(), transaction, attributes, payload);

    int16_t resp = send(transactionID, &packet, nullptr, session);
    if (resp == -1) {
      throw Exception(SITE2SITE_EXCEPTION, "Send Failed in transaction " + transactionID.to_string());
    }
    logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID.to_string() << " sent bytes length" << payload.length();

    if (!confirm(transactionID)) {
      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID.to_string());
    }
    if (!complete(transactionID)) {
      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID.to_string());
    }
    logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID.to_string()
        << " successfully send flow record " << transaction->current_transfers_ << " content bytes " << transaction->_bytes;
  } catch (std::exception &exception) {
    if (transaction)
      deleteTransaction(transactionID);
    context->yield();
    tearDown();
    logger_->log_debug("Caught Exception %s", exception.what());
    throw;
  } catch (...) {
    if (transaction)
      deleteTransaction(transactionID);
    context->yield();
    tearDown();
    logger_->log_debug("Caught Exception during RawSiteToSiteClient::transferBytes");
    throw;
  }

  deleteTransaction(transactionID);

  return true;
}

} /* namespace sitetosite */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
