/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "VersionedCacheableObjectPartList.hpp"

#include <geode/CacheableString.hpp>
#include <geode/ExceptionTypes.hpp>

#include "CacheableToken.hpp"
#include "DiskStoreId.hpp"
#include "DiskVersionTag.hpp"
#include "ThinClientRegion.hpp"

namespace apache {
namespace geode {
namespace client {

const uint8_t VersionedCacheableObjectPartList::FLAG_NULL_TAG = 0;
const uint8_t VersionedCacheableObjectPartList::FLAG_FULL_TAG = 1;
const uint8_t VersionedCacheableObjectPartList::FLAG_TAG_WITH_NEW_ID = 2;
const uint8_t VersionedCacheableObjectPartList::FLAG_TAG_WITH_NUMBER_ID = 3;

void VersionedCacheableObjectPartList::toData(DataOutput&) const {
  throw UnsupportedOperationException(
      "VersionedCacheableObjectPartList::toData not implemented");
}

void VersionedCacheableObjectPartList::readObjectPart(
    int32_t index, DataInput& input, std::shared_ptr<CacheableKey> keyPtr) {
  auto objType = input.read();
  std::shared_ptr<Cacheable> value;
  m_byteArray[index] = objType;
  bool isException = (objType == 2 ? 1 : 0);

  if (isException) {  // Exception case
    // Skip the exception that is in java serialized format, we cant read it.
    input.advanceCursor(input.readArrayLength());
    const auto exMsg = input.readString();  ////4.1

    std::shared_ptr<Exception> ex;
    if (exMsg == "org.apache.geode.security.NotAuthorizedException") {
      auto message = "Authorization exception at server: " + exMsg;
      ex = std::make_shared<NotAuthorizedException>(message);
    } else {
      auto message = "Exception at remote server: " + exMsg;
      ex = std::make_shared<CacheServerException>(message);
    }
    m_exceptions->emplace(keyPtr, ex);
  } else if (m_serializeValues) {
    // read length
    int32_t skipLen = input.readArrayLength();
    int8_t* bytes = nullptr;
    if (skipLen > 0) {
      // readObject
      bytes = new int8_t[skipLen];
      input.readBytesOnly(bytes, skipLen);
    }
    m_values->emplace(keyPtr, CacheableBytes::create(
                                  std::vector<int8_t>(bytes, bytes + skipLen)));

    /* adongre
     * CID 29377: Resource leak (RESOURCE_LEAK)Calling allocation function
     * "apache::geode::client::DataInput::readBytes(unsigned char **, int *)" on
     * "bytes".
     */
    _GEODE_SAFE_DELETE_ARRAY(bytes);

  } else {
    // set nullptr to indicate that there is no exception for the key on this
    // index
    // readObject
    input.readObject(value);
    if (m_values) m_values->emplace(keyPtr, value);
  }
}

void VersionedCacheableObjectPartList::fromData(DataInput& input) {
  std::lock_guard<decltype(m_responseLock)> guard(m_responseLock);
  LOGDEBUG("VersionedCacheableObjectPartList::fromData");
  uint8_t flags = input.read();
  m_hasKeys = (flags & 0x01) == 0x01;
  bool hasObjects = (flags & 0x02) == 0x02;
  m_hasTags = (flags & 0x04) == 0x04;
  m_regionIsVersioned = (flags & 0x08) == 0x08;
  m_serializeValues = (flags & 0x10) == 0x10;
  bool persistent = (flags & 0x20) == 0x20;
  std::shared_ptr<CacheableString> exMsgPtr;
  int32_t len = 0;
  bool valuesNULL = false;
  int32_t keysOffset = (m_keysOffset != nullptr ? *m_keysOffset : 0);
  // bool readObjLen = false;
  // int32_t lenOfObjects = 0;
  if (m_values == nullptr) {
    m_values = std::make_shared<HashMapOfCacheable>();
    valuesNULL = true;
  }

  if (!m_hasKeys && !hasObjects && !m_hasTags) {
    LOGDEBUG(
        "VersionedCacheableObjectPartList::fromData: Looks like message has no "
        "data. Returning,");
  }

  auto localKeys =
      std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
  if (m_hasKeys) {
    len = static_cast<int32_t>(input.readUnsignedVL());

    for (int32_t index = 0; index < len; ++index) {
      auto key = std::dynamic_pointer_cast<CacheableKey>(input.readObject());
      if (m_resultKeys != nullptr) {
        m_resultKeys->push_back(key);
      }
      m_tempKeys->push_back(key);
      localKeys->push_back(key);
    }
  } else if (m_keys != nullptr) {
    LOGDEBUG("VersionedCacheableObjectPartList::fromData: m_keys NOT nullptr");
    /*
       if (m_hasKeys) {
       int64_t tempLen;
       input.readUnsignedVL(&tempLen);
       len = (int32_t)tempLen;
       }else{
       len = m_keys->size();
       }
       lenOfObjects = len;
       readObjLen = true;
       for (int32_t index = keysOffset; index < keysOffset + len; ++index) {
       key = m_keys->at(index);
       if (m_resultKeys != nullptr) {
       m_resultKeys->push_back(key);
       }
       }*/
  } else if (hasObjects) {
    if (m_keys == nullptr && m_resultKeys == nullptr) {
      LOGERROR(
          "VersionedCacheableObjectPartList::fromData: Exception: hasObjects "
          "is true and m_keys and m_resultKeys are also nullptr");
      throw FatalInternalException(
          "VersionedCacheableObjectPartList: "
          "hasObjects is true and m_keys is also nullptr");
    } else {
      LOGDEBUG(
          "VersionedCacheableObjectPartList::fromData m_keys or m_resultKeys "
          "not null");
    }
  } else {
    LOGDEBUG(
        "VersionedCacheableObjectPartList::fromData m_hasKeys, m_keys, "
        "hasObjects all are nullptr");
  }  // m_hasKeys else ends here

  if (hasObjects) {
    len = static_cast<int32_t>(input.readUnsignedVL());
    m_byteArray.resize(len);
    for (int32_t index = 0; index < len; ++index) {
      if (m_keys != nullptr && !m_hasKeys) {
        readObjectPart(index, input, m_keys->at(index + keysOffset));
      } else /*if (m_resultKeys != nullptr && m_resultKeys->size() > 0)*/ {
        readObjectPart(index, input, localKeys->at(index));
      } /*else{
         LOGERROR("VersionedCacheableObjectPartList::fromData: hasObjects = true
       but m_keys is nullptr and m_resultKeys== nullptr or m_resultKeys->size=0"
       );
       }*/
    }
  }  // hasObjects ends here

  if (m_hasTags) {
    len = static_cast<int32_t>(input.readUnsignedVL());
    m_versionTags.resize(len);
    std::vector<uint16_t> ids;
    MemberListForVersionStamp& memberListForVersionStamp =
        *(m_region->getCacheImpl()->getMemberListForVersionStamp());
    for (int32_t index = 0; index < len; index++) {
      uint8_t entryType = input.read();
      std::shared_ptr<VersionTag> versionTag;
      switch (entryType) {
        case FLAG_NULL_TAG: {
          break;
        }
        case FLAG_FULL_TAG: {
          if (persistent) {
            versionTag = std::shared_ptr<VersionTag>(
                new DiskVersionTag(memberListForVersionStamp));
          } else {
            versionTag = std::shared_ptr<VersionTag>(
                new VersionTag(memberListForVersionStamp));
          }
          versionTag->fromData(input);
          versionTag->replaceNullMemberId(getEndpointMemId());
          break;
        }

        case FLAG_TAG_WITH_NEW_ID: {
          if (persistent) {
            versionTag = std::shared_ptr<VersionTag>(
                new DiskVersionTag(memberListForVersionStamp));
          } else {
            versionTag = std::shared_ptr<VersionTag>(
                new VersionTag(memberListForVersionStamp));
          }
          versionTag->fromData(input);
          ids.push_back(versionTag->getInternalMemID());
          break;
        }

        case FLAG_TAG_WITH_NUMBER_ID: {
          if (persistent) {
            versionTag = std::shared_ptr<VersionTag>(
                new DiskVersionTag(memberListForVersionStamp));
          } else {
            versionTag = std::shared_ptr<VersionTag>(
                new VersionTag(memberListForVersionStamp));
          }
          versionTag->fromData(input);
          auto idNumber = input.readUnsignedVL();
          versionTag->setInternalMemID(ids.at(idNumber));
          break;
        }
        default: {
          break;
        }
      }
      m_versionTags[index] = versionTag;
    }
  } else {  // if consistancyEnabled=false, we need to pass empty or
            // std::shared_ptr<NULL> m_versionTags
    for (int32_t index = 0; index < len; ++index) {
      std::shared_ptr<VersionTag> versionTag;
      m_versionTags[index] = versionTag;
    }
  }

  if (hasObjects) {
    std::shared_ptr<CacheableKey> key;
    std::shared_ptr<VersionTag> versionTag;
    std::shared_ptr<Cacheable> value;

    for (int32_t index = 0; index < len; ++index) {
      if (m_keys != nullptr && !m_hasKeys) {
        key = m_keys->at(index + keysOffset);
      } else /*if (m_resultKeys != nullptr && m_resultKeys->size() > 0)*/ {
        key = localKeys->at(index);
      } /*else{
         LOGERROR("VersionedCacheableObjectPartList::fromData: hasObjects = true
       but m_keys is nullptr AND m_resultKeys=nullptr or m_resultKeys->size=0"
       );
       }*/

      const auto& iter = m_values->find(key);
      value = iter == m_values->end() ? nullptr : iter->second;
      if (m_byteArray[index] != 3) {  // 3 - key not found on server
        std::shared_ptr<Cacheable> oldValue;
        if (m_addToLocalCache) {
          int updateCount = -1;
          versionTag = m_versionTags[index];

          GfErrType err =
              m_region->putLocal("getAll", false, key, value, oldValue, true,
                                 updateCount, m_destroyTracker, versionTag);
          if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
            LOGDEBUG(
                "VersionedCacheableObjectPartList::fromData putLocal for key [%s] failed because the cache \
                  already contains an entry with higher version.",
                Utils::nullSafeToString(key).c_str());
            // replace the value with higher version tag
            (*m_values)[key] = oldValue;
          }
        }       // END::m_addToLocalCache
        else {  // m_addToLocalCache = false
          m_region->getEntry(key, oldValue);
          // if value has already been received via notification or put by
          // another thread, then return that
          if (oldValue != nullptr && !CacheableToken::isInvalid(oldValue)) {
            // replace the value with new value
            (*m_values)[key] = oldValue;
          }
        }
      }
    }
  }
  if (m_keysOffset != nullptr) *m_keysOffset += len;
  if (valuesNULL) m_values = nullptr;
}

}  // namespace client
}  // namespace geode
}  // namespace apache
