/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "ClientProxyMembershipID.hpp"

#include <ctime>
#include <iostream>
#include <memory>
#include <string>

#include <ace/OS.h>

#include <geode/CacheableBuiltins.hpp>

#include "DataOutputInternal.hpp"
#include "DistributedSystem.hpp"
#include "Version.hpp"

#define ADDRSIZE 4
#define DCPORT 12334
#define VMKIND 13
#define ROLEARRLENGTH 0
static int synch_counter = 2;
namespace apache {
namespace geode {
namespace client {

namespace {
static class RandomInitializer {
 public:
  RandomInitializer() {
    // using current time and
    // processor time would be good enough for our purpose
    auto seed = ACE_OS::getpid() + ACE_OS::gettimeofday().msec() + clock();
    seed += ACE_OS::gettimeofday().usec();
    // LOGINFO("PID %ld seed %ld ACE_OS::gettimeofday().usec() = %ld clock =
    // %ld ACE_OS::gettimeofday().msec() = %ld", pid, seed ,
    // ACE_OS::gettimeofday().usec() , clock(),
    // ACE_OS::gettimeofday().msec());
    ACE_OS::srand(seed);
  }
} oneTimeRandomInitializer;
}  // namespace

const int ClientProxyMembershipID::VERSION_MASK = 0x8;
const int8_t ClientProxyMembershipID::TOKEN_ORDINAL = -1;

ClientProxyMembershipID::ClientProxyMembershipID()
    : m_hostPort(0),
      m_hostAddr(nullptr)
      /* adongre  - Coverity II
       * CID 29278: Uninitialized scalar field (UNINIT_CTOR)
       */
      ,
      m_hostAddrLen(0),
      m_hostAddrLocalMem(false),
      m_vmViewId(0) {}

ClientProxyMembershipID::~ClientProxyMembershipID() noexcept {
  if (m_hostAddrLocalMem) delete[] m_hostAddr;
}

ClientProxyMembershipID::ClientProxyMembershipID(
    std::string dsName, std::string randString, const char* hostname,
    uint32_t hostAddr, uint32_t hostPort, const char* durableClientId,
    const std::chrono::seconds durableClntTimeOut)
    : m_hostAddrAsUInt32(hostAddr) {
  int32_t vmPID = ACE_OS::getpid();
  initObjectVars(hostname, reinterpret_cast<uint8_t*>(&m_hostAddrAsUInt32), 4,
                 false, hostPort, durableClientId, durableClntTimeOut, DCPORT,
                 vmPID, VMKIND, 0, dsName.c_str(), randString.c_str(), 0);
}

// This is only for unit tests and should not be used for any other purpose. See
// testEntriesMapForVersioning.cpp for more details
ClientProxyMembershipID::ClientProxyMembershipID(
    uint8_t* hostAddr, uint32_t hostAddrLen, uint32_t hostPort,
    const char* dsname, const char* uniqueTag, uint32_t vmViewId) {
  int32_t vmPID = ACE_OS::getpid();
  initObjectVars("localhost", hostAddr, hostAddrLen, false, hostPort, "",
                 std::chrono::seconds::zero(), DCPORT, vmPID, VMKIND, 0, dsname,
                 uniqueTag, vmViewId);
}
void ClientProxyMembershipID::initObjectVars(
    const char* hostname, uint8_t* hostAddr, uint32_t hostAddrLen,
    bool hostAddrLocalMem, uint32_t hostPort, const char* durableClientId,
    const std::chrono::seconds durableClntTimeOut, int32_t dcPort, int32_t vPID,
    int8_t vmkind, int8_t splitBrainFlag, const char* dsname,
    const char* uniqueTag, uint32_t vmViewId) {
  DataOutputInternal m_memID;
  if (dsname == nullptr) {
    m_dsname = std::string("");
  } else {
    m_dsname = std::string(dsname);
  }
  m_hostPort = hostPort;
  m_hostAddr = hostAddr;
  m_hostAddrLen = hostAddrLen;
  m_hostAddrLocalMem = hostAddrLocalMem;
  if (uniqueTag == nullptr) {
    m_uniqueTag = std::string("");
  } else {
    m_uniqueTag = std::string(uniqueTag);
  }

  m_vmViewId = vmViewId;
  m_memID.write(static_cast<int8_t>(DSCode::FixedIDByte));
  m_memID.write(static_cast<int8_t>(DSCode::InternalDistributedMember));
  m_memID.writeArrayLen(ADDRSIZE);
  // writing first 4 bytes of the address. This will be same until
  // IPV6 support is added in the client
  uint32_t temp;
  memcpy(&temp, hostAddr, 4);
  m_memID.writeInt(static_cast<int32_t>(temp));
  // m_memID.writeInt((int32_t)hostPort);
  m_memID.writeInt(static_cast<int32_t>(synch_counter));
  m_memID.writeString(hostname);
  m_memID.write(splitBrainFlag);  // splitbrain flags

  m_memID.writeInt(dcPort);

  m_memID.writeInt(vPID);
  m_memID.write(vmkind);
  m_memID.writeArrayLen(ROLEARRLENGTH);
  m_memID.writeString(dsname);
  m_memID.writeString(uniqueTag);

  if (durableClientId != nullptr &&
      durableClntTimeOut != std::chrono::seconds::zero()) {
    m_memID.writeString(durableClientId);
    const auto int32ptr = CacheableInt32::create(
        static_cast<int32_t>(durableClntTimeOut.count()));
    int32ptr->toData(m_memID);
  }
  writeVersion(Version::getOrdinal(), m_memID);
  size_t len;
  char* buf =
      reinterpret_cast<char*>(const_cast<uint8_t*>(m_memID.getBuffer(&len)));
  m_memIDStr.append(buf, len);

  char PID[15] = {0};
  char Synch_Counter[15] = {0};
  ACE_OS::itoa(vPID, PID, 10);
  ACE_OS::itoa(synch_counter, Synch_Counter, 10);
  clientID.append(hostname);
  clientID.append("(");
  clientID.append(PID);
  clientID.append(":loner):");
  clientID.append(Synch_Counter);
  clientID.append(":");
  clientID.append(getUniqueTag());
  clientID.append(":");
  clientID.append(getDSName());
  // Hash key.

  // int offset = 0;
  for (uint32_t i = 0; i < getHostAddrLen(); i++) {
    char hostInfo[16] = {0};
    // offset += ACE_OS::snprintf(hostInfo + offset , 255 - offset, ":%x",
    // m_hostAddr[i]);
    ACE_OS::itoa(m_hostAddr[i], hostInfo, 16);
    m_hashKey.append(":");
    m_hashKey.append(hostInfo);
  }
  m_hashKey.append(":");
  char hostInfoPort[16] = {0};
  ACE_OS::itoa(getHostPort(), hostInfoPort, 10);
  //  offset += ACE_OS::snprintf(hostInfo + offset, 255 - offset , ":%d",
  //  getHostPort());
  m_hashKey.append(hostInfoPort);
  m_hashKey.append(":");
  m_hashKey.append(getDSName());
  m_hashKey.append(":");
  if (m_uniqueTag.size() != 0) {
    m_hashKey.append(getUniqueTag());
  } else {
    m_hashKey.append(":");
    char viewid[16] = {0};
    ACE_OS::itoa(m_vmViewId, viewid, 10);
    // offset += ACE_OS::snprintf(hostInfo + offset , 255 - offset , ":%d",
    // m_vmViewId);
    m_hashKey.append(viewid);
  }
  LOGDEBUG("GethashKey %s client id: %s ", m_hashKey.c_str(), clientID.c_str());
}
const char* ClientProxyMembershipID::getDSMemberId(uint32_t& mesgLength) const {
  mesgLength = static_cast<int32_t>(m_memIDStr.size());
  return m_memIDStr.c_str();
}
const char* ClientProxyMembershipID::getDSMemberIdForCS43(
    uint32_t& mesgLength) const {
  mesgLength = static_cast<int32_t>(m_dsmemIDStr.size());
  return m_dsmemIDStr.c_str();
}

const std::string& ClientProxyMembershipID::getDSMemberIdForThinClientUse() {
  return clientID;
}

std::string ClientProxyMembershipID::getHashKey() { return m_hashKey; }

void ClientProxyMembershipID::toData(DataOutput&) const {
  throw IllegalStateException("Member ID toData() not implemented.");
}

void ClientProxyMembershipID::fromData(DataInput& input) {
  // deserialization for PR FX HA

  auto len = input.readArrayLength();  // inetaddress len
  m_hostAddrLocalMem = true;
  auto hostAddr = new uint8_t[len];
  input.readBytesOnly(hostAddr, len);  // inetaddress
  auto hostPort = input.readInt32();   // port
  auto hostname =
      std::dynamic_pointer_cast<CacheableString>(input.readObject());
  auto splitbrain = input.read();   // splitbrain
  auto dcport = input.readInt32();  // port
  auto vPID = input.readInt32();    // pid
  auto vmKind = input.read();       // vmkind
  auto aStringArray = CacheableStringArray::create();
  aStringArray->fromData(input);
  auto dsName = std::dynamic_pointer_cast<CacheableString>(input.readObject());
  auto uniqueTag =
      std::dynamic_pointer_cast<CacheableString>(input.readObject());
  auto durableClientId =
      std::dynamic_pointer_cast<CacheableString>(input.readObject());
  auto durableClntTimeOut =
      std::chrono::seconds(input.readInt32());  // durable client timeout
  int32_t vmViewId = 0;
  readVersion(splitbrain, input);

  if (vmKind != ClientProxyMembershipID::LONER_DM_TYPE) {
    vmViewId = std::stoi(uniqueTag->value());
    initObjectVars(hostname->value().c_str(), hostAddr, len, true, hostPort,
                   durableClientId->value().c_str(), durableClntTimeOut, dcport,
                   vPID, vmKind, splitbrain, dsName->value().c_str(), nullptr,
                   vmViewId);
  } else {
    // initialize the object
    initObjectVars(hostname->value().c_str(), hostAddr, len, true, hostPort,
                   durableClientId->value().c_str(), durableClntTimeOut, dcport,
                   vPID, vmKind, splitbrain, dsName->value().c_str(),
                   uniqueTag->value().c_str(), 0);
  }

  readAdditionalData(input);
}

Serializable* ClientProxyMembershipID::readEssentialData(DataInput& input) {
  uint8_t* hostAddr;
  int32_t len, hostPort, vmViewId = 0;
  std::shared_ptr<CacheableString> hostname, dsName, uniqueTag, vmViewIdstr;

  len = input.readArrayLength();  // inetaddress len
  m_hostAddrLocalMem = true;
  /* adongre - Coverity II
   * CID 29183: Out-of-bounds access (OVERRUN_DYNAMIC)
   */
  // hostAddr = new uint8_t(len);
  hostAddr = new uint8_t[len];

  input.readBytesOnly(hostAddr, len);  // inetaddress

  hostPort = input.readInt32();  // port
  // TODO: RVV get the host name from

  // read and ignore flag
  input.read();

  const auto vmKind = input.read();  // vmkind

  if (vmKind == ClientProxyMembershipID::LONER_DM_TYPE) {
    uniqueTag = std::dynamic_pointer_cast<CacheableString>(input.readObject());
  } else {
    vmViewIdstr =
        std::dynamic_pointer_cast<CacheableString>(input.readObject());
    vmViewId = std::stoi(vmViewIdstr->value());
  }

  dsName = std::dynamic_pointer_cast<CacheableString>(input.readObject());

  if (vmKind != ClientProxyMembershipID::LONER_DM_TYPE) {
    // initialize the object with the values read and some dummy values
    initObjectVars("", hostAddr, len, true, hostPort, "",
                   std::chrono::seconds::zero(), DCPORT, 0, vmKind, 0,
                   dsName->value().c_str(), nullptr, vmViewId);
  } else {
    // initialize the object with the values read and some dummy values
    initObjectVars("", hostAddr, len, true, hostPort, "",
                   std::chrono::seconds::zero(), DCPORT, 0, vmKind, 0,
                   dsName->value().c_str(), uniqueTag->value().c_str(),
                   vmViewId);
  }

  readAdditionalData(input);

  return this;
}

void ClientProxyMembershipID::readAdditionalData(DataInput& input) {
  // Skip unused UUID (16) and weight (0);
  input.advanceCursor(17);
}

void ClientProxyMembershipID::increaseSynchCounter() { ++synch_counter; }

// Compares two membershipIds. This is based on the compareTo function
// of InternalDistributedMember class of Java.
// Any change to the java function should be reflected here as well.
int16_t ClientProxyMembershipID::compareTo(
    const DSMemberForVersionStamp& other) const {
  if (this == &other) {
    return 0;
  }

  const ClientProxyMembershipID& otherMember =
      static_cast<const ClientProxyMembershipID&>(other);
  uint32_t myPort = getHostPort();
  uint32_t otherPort = otherMember.getHostPort();

  if (myPort < otherPort) return -1;
  if (myPort > otherPort) return 1;

  uint8_t* myAddr = getHostAddr();
  uint8_t* otherAddr = otherMember.getHostAddr();
  // Discard null cases
  if (myAddr == nullptr && otherAddr == nullptr) {
    if (myPort < otherPort) {
      return -1;
    } else if (myPort > otherPort) {
      return 1;
    } else {
      return 0;
    }
  } else if (myAddr == nullptr) {
    return -1;
  } else if (otherAddr == nullptr) {
    return 1;
  }
  for (uint32_t i = 0; i < getHostAddrLen(); i++) {
    if (myAddr[i] < otherAddr[i]) return -1;
    if (myAddr[i] > otherAddr[i]) return 1;
  }

  std::string myUniqueTag = getUniqueTag();
  std::string otherUniqueTag = otherMember.getUniqueTag();
  if (myUniqueTag.empty() && otherUniqueTag.empty()) {
    if (m_vmViewId < otherMember.m_vmViewId) {
      return -1;
    } else if (m_vmViewId > otherMember.m_vmViewId) {
      return 1;
    }  // else they're the same, so continue
  } else if (myUniqueTag.empty()) {
    return -1;
  } else if (otherUniqueTag.empty()) {
    return 1;
  } else {
    int i = myUniqueTag.compare(otherUniqueTag);
    if (i != 0) {
      return i;
    }
  }
  return 0;
}

void ClientProxyMembershipID::readVersion(int flags, DataInput& input) {
  if ((flags & ClientProxyMembershipID::VERSION_MASK) != 0) {
    int8_t ordinal = input.read();
    LOGDEBUG("ClientProxyMembershipID::readVersion ordinal = %d ", ordinal);
    if (ordinal != ClientProxyMembershipID::TOKEN_ORDINAL) {
    } else {
      LOGDEBUG("ClientProxyMembershipID::readVersion ordinal = %d ",
               input.readInt16());
    }
  }
}

void ClientProxyMembershipID::writeVersion(int16_t ordinal,
                                           DataOutput& output) {
  if (ordinal <= SCHAR_MAX) {
    output.write(static_cast<int8_t>(ordinal));
    LOGDEBUG("ClientProxyMembershipID::writeVersion ordinal = %d ", ordinal);
  } else {
    output.write(ClientProxyMembershipID::TOKEN_ORDINAL);
    output.writeInt(ordinal);
    LOGDEBUG("ClientProxyMembershipID::writeVersion ordinal = %d ", ordinal);
  }
}

}  // namespace client
}  // namespace geode
}  // namespace apache
