/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "ThinClientRegion.hpp"

#include <algorithm>
#include <limits>
#include <regex>

#include <geode/PoolManager.hpp>
#include <geode/Struct.hpp>
#include <geode/SystemProperties.hpp>
#include <geode/UserFunctionExecutionException.hpp>

#include "AutoDelete.hpp"
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "DataInputInternal.hpp"
#include "PutAllPartialResultServerException.hpp"
#include "ReadWriteLock.hpp"
#include "RegionGlobalLocks.hpp"
#include "RemoteQuery.hpp"
#include "TcrConnectionManager.hpp"
#include "TcrDistributionManager.hpp"
#include "TcrEndpoint.hpp"
#include "ThinClientBaseDM.hpp"
#include "ThinClientPoolDM.hpp"
#include "UserAttributes.hpp"
#include "Utils.hpp"
#include "VersionedCacheableObjectPartList.hpp"
#include "util/bounds.hpp"
#include "util/exception.hpp"

namespace apache {
namespace geode {
namespace client {

static const std::regex PREDICATE_IS_FULL_QUERY_REGEX(
    "^\\s*(?:select|import)\\b", std::regex::icase);

void setThreadLocalExceptionMessage(std::string exMsg);

class PutAllWork : public PooledWork<GfErrType> {
  ThinClientPoolDM* m_poolDM;
  std::shared_ptr<BucketServerLocation> m_serverLocation;
  TcrMessage* m_request;
  TcrMessageReply* m_reply;
  MapOfUpdateCounters m_mapOfUpdateCounters;
  bool m_attemptFailover;
  bool m_isBGThread;
  std::shared_ptr<UserAttributes> m_userAttribute;
  const std::shared_ptr<Region> m_region;
  std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> m_keys;
  std::shared_ptr<HashMapOfCacheable> m_map;
  std::shared_ptr<VersionedCacheableObjectPartList> m_verObjPartListPtr;
  std::chrono::milliseconds m_timeout;
  std::shared_ptr<PutAllPartialResultServerException> m_papException;
  bool m_isPapeReceived;
  ChunkedPutAllResponse* m_resultCollector;
  // UNUSED const std::shared_ptr<Serializable>& m_aCallbackArgument;

 public:
  PutAllWork(const PutAllWork&) = delete;
  PutAllWork& operator=(const PutAllWork&) = delete;
  PutAllWork(
      ThinClientPoolDM* poolDM,
      const std::shared_ptr<BucketServerLocation>& serverLocation,
      const std::shared_ptr<Region>& region, bool attemptFailover,
      bool isBGThread, const std::shared_ptr<HashMapOfCacheable> map,
      const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> keys,
      std::chrono::milliseconds timeout,
      const std::shared_ptr<Serializable>& aCallbackArgument)
      : m_poolDM(poolDM),
        m_serverLocation(serverLocation),
        m_attemptFailover(attemptFailover),
        m_isBGThread(isBGThread),
        m_userAttribute(nullptr),
        m_region(region),
        m_keys(keys),
        m_map(map),
        m_timeout(timeout),
        m_papException(nullptr),
        m_isPapeReceived(false)
  // UNUSED , m_aCallbackArgument(aCallbackArgument)
  {
    m_request = new TcrMessagePutAll(
        new DataOutput(m_region->getCache().createDataOutput()), m_region.get(),
        *m_map, m_timeout, m_poolDM, aCallbackArgument);
    m_reply = new TcrMessageReply(true, m_poolDM);

    // create new instanceof VCOPL
    std::recursive_mutex responseLock;
    m_verObjPartListPtr =
        std::make_shared<VersionedCacheableObjectPartList>(keys, responseLock);

    if (m_poolDM->isMultiUserMode()) {
      m_userAttribute = UserAttributes::threadLocalUserAttributes;
    }

    m_request->setTimeout(m_timeout);
    m_reply->setTimeout(m_timeout);
    m_resultCollector = new ChunkedPutAllResponse(
        m_region, *m_reply, responseLock, m_verObjPartListPtr);
    m_reply->setChunkedResultHandler(m_resultCollector);
  }

  ~PutAllWork() {
    delete m_request;
    delete m_reply;
    delete m_resultCollector;
  }

  TcrMessage* getReply() { return m_reply; }

  std::shared_ptr<HashMapOfCacheable> getPutAllMap() { return m_map; }

  std::shared_ptr<VersionedCacheableObjectPartList> getVerObjPartList() {
    return m_verObjPartListPtr;
  }

  ChunkedPutAllResponse* getResultCollector() { return m_resultCollector; }

  std::shared_ptr<BucketServerLocation> getServerLocation() {
    return m_serverLocation;
  }

  std::shared_ptr<PutAllPartialResultServerException> getPaPResultException() {
    return m_papException;
  }

  void init() {}
  GfErrType execute(void) {
    GuardUserAttributes gua;

    if (m_userAttribute != nullptr) {
      gua.setAuthenticatedView(m_userAttribute->getAuthenticatedView());
    }

    GfErrType err = GF_NOERR;
    err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover,
                                    m_isBGThread, m_serverLocation);

    // Set Version Tags
    LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ",
             m_resultCollector->getList()->size(), err);
    m_verObjPartListPtr->setVersionedTagptr(
        m_resultCollector->getList()->getVersionedTagptr());

    if (err != GF_NOERR) {
      return err;
    } /*This can be GF_NOTCON, counterpart to java
                          ServerConnectivityException*/

    switch (m_reply->getMessageType()) {
      case TcrMessage::REPLY:
        break;
      case TcrMessage::RESPONSE:
        LOGDEBUG("PutAllwork execute err = %d ", err);
        break;
      case TcrMessage::EXCEPTION:
        // TODO::Check for the PAPException and READ
        // PutAllPartialResultServerException and set its member for later use.
        // set m_papException and m_isPapeReceived
        m_isPapeReceived = true;
        if (m_poolDM->isNotAuthorizedException(m_reply->getException())) {
          LOGDEBUG("received NotAuthorizedException");
          err = GF_AUTHENTICATION_FAILED_EXCEPTION;
        } else if (m_poolDM->isPutAllPartialResultException(
                       m_reply->getException())) {
          LOGDEBUG("received PutAllPartialResultException");
          err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
        } else {
          LOGDEBUG("received unknown exception:" + m_reply->getException());
          err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
          // TODO should assign a new err code
        }

        break;
      case TcrMessage::PUT_DATA_ERROR:
        err = GF_CACHESERVER_EXCEPTION;
        break;
      default:
        LOGERROR("Unknown message type %d during region put-all",
                 m_reply->getMessageType());
        err = GF_NOTOBJ;
        break;
    }
    return err;
  }
};

class RemoveAllWork : public PooledWork<GfErrType> {
  ThinClientPoolDM* m_poolDM;
  std::shared_ptr<BucketServerLocation> m_serverLocation;
  TcrMessage* m_request;
  TcrMessageReply* m_reply;
  MapOfUpdateCounters m_mapOfUpdateCounters;
  bool m_attemptFailover;
  bool m_isBGThread;
  std::shared_ptr<UserAttributes> m_userAttribute;
  const std::shared_ptr<Region> m_region;
  const std::shared_ptr<Serializable>& m_aCallbackArgument;
  std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> m_keys;
  std::shared_ptr<VersionedCacheableObjectPartList> m_verObjPartListPtr;
  std::shared_ptr<PutAllPartialResultServerException> m_papException;
  bool m_isPapeReceived;
  ChunkedRemoveAllResponse* m_resultCollector;

 public:
  RemoveAllWork(const RemoveAllWork&) = delete;
  RemoveAllWork& operator=(const RemoveAllWork&) = delete;
  RemoveAllWork(
      ThinClientPoolDM* poolDM,
      const std::shared_ptr<BucketServerLocation>& serverLocation,
      const std::shared_ptr<Region>& region, bool attemptFailover,
      bool isBGThread,
      const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> keys,
      const std::shared_ptr<Serializable>& aCallbackArgument)
      : m_poolDM(poolDM),
        m_serverLocation(serverLocation),
        m_attemptFailover(attemptFailover),
        m_isBGThread(isBGThread),
        m_userAttribute(nullptr),
        m_region(region),
        m_aCallbackArgument(aCallbackArgument),
        m_keys(keys),
        m_papException(nullptr),
        m_isPapeReceived(false) {
    m_request = new TcrMessageRemoveAll(
        new DataOutput(m_region->getCache().createDataOutput()), m_region.get(),
        *keys, m_aCallbackArgument, m_poolDM);
    m_reply = new TcrMessageReply(true, m_poolDM);
    // create new instanceof VCOPL
    std::recursive_mutex responseLock;
    m_verObjPartListPtr =
        std::make_shared<VersionedCacheableObjectPartList>(keys, responseLock);

    if (m_poolDM->isMultiUserMode()) {
      m_userAttribute = UserAttributes::threadLocalUserAttributes;
    }

    m_resultCollector = new ChunkedRemoveAllResponse(
        m_region, *m_reply, responseLock, m_verObjPartListPtr);
    m_reply->setChunkedResultHandler(m_resultCollector);
  }

  ~RemoveAllWork() {
    delete m_request;
    delete m_reply;
    delete m_resultCollector;
  }

  TcrMessage* getReply() { return m_reply; }

  std::shared_ptr<VersionedCacheableObjectPartList> getVerObjPartList() {
    return m_verObjPartListPtr;
  }

  ChunkedRemoveAllResponse* getResultCollector() { return m_resultCollector; }

  std::shared_ptr<BucketServerLocation> getServerLocation() {
    return m_serverLocation;
  }

  std::shared_ptr<PutAllPartialResultServerException> getPaPResultException() {
    return m_papException;
  }

  void init() {}
  GfErrType execute(void) {
    GuardUserAttributes gua;

    if (m_userAttribute != nullptr) {
      gua.setAuthenticatedView(m_userAttribute->getAuthenticatedView());
    }

    GfErrType err = GF_NOERR;
    err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover,
                                    m_isBGThread, m_serverLocation);

    // Set Version Tags
    LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ",
             m_resultCollector->getList()->size(), err);
    m_verObjPartListPtr->setVersionedTagptr(
        m_resultCollector->getList()->getVersionedTagptr());

    if (err != GF_NOERR) {
      return err;
    } /*This can be GF_NOTCON, counterpart to java
                          ServerConnectivityException*/

    switch (m_reply->getMessageType()) {
      case TcrMessage::REPLY:
        break;
      case TcrMessage::RESPONSE:
        LOGDEBUG("RemoveAllWork execute err = %d ", err);
        break;
      case TcrMessage::EXCEPTION:
        // TODO::Check for the PAPException and READ
        // PutAllPartialResultServerException and set its member for later use.
        // set m_papException and m_isPapeReceived
        m_isPapeReceived = true;
        if (m_poolDM->isNotAuthorizedException(m_reply->getException())) {
          LOGDEBUG("received NotAuthorizedException");
          err = GF_AUTHENTICATION_FAILED_EXCEPTION;
        } else if (m_poolDM->isPutAllPartialResultException(
                       m_reply->getException())) {
          LOGDEBUG("received PutAllPartialResultException");
          err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
        } else {
          LOGDEBUG("received unknown exception:" + m_reply->getException());
          err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
          // TODO should assign a new err code
        }

        break;
      case TcrMessage::PUT_DATA_ERROR:
        err = GF_CACHESERVER_EXCEPTION;
        break;
      default:
        LOGERROR("Unknown message type %d during region remove-all",
                 m_reply->getMessageType());
        err = GF_NOTOBJ;
        break;
    }
    return err;
  }
};

ThinClientRegion::ThinClientRegion(
    const std::string& name, CacheImpl* cacheImpl,
    const std::shared_ptr<RegionInternal>& rPtr, RegionAttributes attributes,
    const std::shared_ptr<CacheStatistics>& stats, bool shared)
    : LocalRegion(name, cacheImpl, rPtr, attributes, stats, shared),
      m_tcrdm(nullptr),
      m_notifyRelease(false),
      m_isMetaDataRefreshed(false) {
  m_transactionEnabled = true;
  m_isDurableClnt = !cacheImpl->getDistributedSystem()
                         .getSystemProperties()
                         .durableClientId()
                         .empty();
}

void ThinClientRegion::initTCR() {
  try {
    m_tcrdm = std::make_shared<TcrDistributionManager>(
        this, m_cacheImpl->tcrConnectionManager());
    m_tcrdm->init();
  } catch (const Exception& ex) {
    LOGERROR("Exception while initializing region: %s: %s",
             ex.getName().c_str(), ex.what());
    throw;
  }
}

void ThinClientRegion::registerKeys(
    const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable,
    bool getInitialValues, bool receiveValues) {
  auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
  if (pool != nullptr) {
    if (!pool->getSubscriptionEnabled()) {
      LOGERROR(
          "Registering keys is supported "
          "only if subscription-enabled attribute is true for pool " +
          pool->getName());
      throw UnsupportedOperationException(
          "Registering keys is supported "
          "only if pool subscription-enabled attribute is true.");
    }
  }
  if (keys.empty()) {
    LOGERROR("Register keys list is empty");
    throw IllegalArgumentException(
        "Register keys "
        "keys vector is empty");
  }
  if (isDurable && !isDurableClient()) {
    LOGERROR(
        "Register keys durable flag is only applicable for durable clients");
    throw IllegalStateException(
        "Durable flag only applicable for "
        "durable clients");
  }
  if (getInitialValues && !m_regionAttributes.getCachingEnabled()) {
    LOGERROR(
        "Register keys getInitialValues flag is only applicable for caching"
        "clients");
    throw IllegalStateException(
        "getInitialValues flag only applicable for caching clients");
  }

  InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
  if (getInitialValues) {
    interestPolicy = InterestResultPolicy::KEYS_VALUES;
  }

  LOGDEBUG("ThinClientRegion::registerKeys : interestpolicy is %d",
           interestPolicy.ordinal);

  GfErrType err = registerKeysNoThrow(keys, true, nullptr, isDurable,
                                      interestPolicy, receiveValues);

  if (m_tcrdm->isFatalError(err)) {
    throwExceptionIfError("Region::registerKeys", err);
  }

  throwExceptionIfError("Region::registerKeys", err);
}

void ThinClientRegion::unregisterKeys(
    const std::vector<std::shared_ptr<CacheableKey>>& keys) {
  auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
  if (pool != nullptr) {
    if (!pool->getSubscriptionEnabled()) {
      LOGERROR(
          "Unregister keys is supported "
          "only if subscription-enabled attribute is true for pool " +
          pool->getName());
      throw UnsupportedOperationException(
          "Unregister keys is supported "
          "only if pool subscription-enabled attribute is true.");
    }
  } else {
    if (!getAttributes().getClientNotificationEnabled()) {
      LOGERROR(
          "Unregister keys is supported "
          "only if region client-notification attribute is true.");
      throw UnsupportedOperationException(
          "Unregister keys is supported "
          "only if region client-notification attribute is true.");
    }
  }
  if (keys.empty()) {
    LOGERROR("Unregister keys list is empty");
    throw IllegalArgumentException(
        "Unregister keys "
        "keys vector is empty");
  }
  GfErrType err = unregisterKeysNoThrow(keys);
  throwExceptionIfError("Region::unregisterKeys", err);
}

void ThinClientRegion::registerAllKeys(bool isDurable, bool getInitialValues,
                                       bool receiveValues) {
  auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
  if (pool != nullptr) {
    if (!pool->getSubscriptionEnabled()) {
      LOGERROR(
          "Register all keys is supported only "
          "if subscription-enabled attribute is true for pool " +
          pool->getName());
      throw UnsupportedOperationException(
          "Register all keys is supported only "
          "if pool subscription-enabled attribute is true.");
    }
  }
  if (isDurable && !isDurableClient()) {
    LOGERROR(
        "Register all keys durable flag is only applicable for durable "
        "clients");
    throw IllegalStateException(
        "Durable flag only applicable for durable clients");
  }

  if (getInitialValues && !m_regionAttributes.getCachingEnabled()) {
    LOGERROR(
        "Register all keys getInitialValues flag is only applicable for caching"
        "clients");
    throw IllegalStateException(
        "getInitialValues flag only applicable for caching clients");
  }

  InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
  if (getInitialValues) {
    interestPolicy = InterestResultPolicy::KEYS_VALUES;
  } else {
    interestPolicy = InterestResultPolicy::KEYS;
  }

  LOGDEBUG("ThinClientRegion::registerAllKeys : interestpolicy is %d",
           interestPolicy.ordinal);

  std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> resultKeys;
  //  if we need to fetch initial data, then we get the keys in
  // that call itself using the special GET_ALL message and do not need
  // to get the keys in the initial  register interest  call
  GfErrType err =
      registerRegexNoThrow(".*", true, nullptr, isDurable, resultKeys,
                           interestPolicy, receiveValues);

  if (m_tcrdm->isFatalError(err)) {
    throwExceptionIfError("Region::registerAllKeys", err);
  }

  // Get the entries from the server using a special GET_ALL message
  throwExceptionIfError("Region::registerAllKeys", err);
}

void ThinClientRegion::registerRegex(const std::string& regex, bool isDurable,
                                     bool getInitialValues,
                                     bool receiveValues) {
  auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
  if (pool != nullptr) {
    if (!pool->getSubscriptionEnabled()) {
      LOGERROR(
          "Register regex is supported only if "
          "subscription-enabled attribute is true for pool " +
          pool->getName());
      throw UnsupportedOperationException(
          "Register regex is supported only if "
          "pool subscription-enabled attribute is true.");
    }
  }
  if (isDurable && !isDurableClient()) {
    LOGERROR("Register regex durable flag only applicable for durable clients");
    throw IllegalStateException(
        "Durable flag only applicable for durable clients");
  }

  if (regex.empty()) {
    throw IllegalArgumentException(
        "Region::registerRegex: Regex string is empty");
  }

  auto interestPolicy = InterestResultPolicy::NONE;
  if (getInitialValues) {
    interestPolicy = InterestResultPolicy::KEYS_VALUES;
  } else {
    interestPolicy = InterestResultPolicy::KEYS;
  }

  LOGDEBUG("ThinClientRegion::registerRegex : interestpolicy is %d",
           interestPolicy.ordinal);

  auto resultKeys2 =
      std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();

  //  if we need to fetch initial data for "allKeys" case, then we
  // get the keys in that call itself using the special GET_ALL message and
  // do not need to get the keys in the initial  register interest  call
  GfErrType err =
      registerRegexNoThrow(regex, true, nullptr, isDurable, resultKeys2,
                           interestPolicy, receiveValues);

  if (m_tcrdm->isFatalError(err)) {
    throwExceptionIfError("Region::registerRegex", err);
  }

  throwExceptionIfError("Region::registerRegex", err);
}

void ThinClientRegion::unregisterRegex(const std::string& regex) {
  auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
  if (pool != nullptr) {
    if (!pool->getSubscriptionEnabled()) {
      LOGERROR(
          "Unregister regex is supported only if "
          "subscription-enabled attribute is true for pool " +
          pool->getName());
      throw UnsupportedOperationException(
          "Unregister regex is supported only if "
          "pool subscription-enabled attribute is true.");
    }
  }

  if (regex.empty()) {
    LOGERROR("Unregister regex string is empty");
    throw IllegalArgumentException("Unregister regex string is empty");
  }

  GfErrType err = unregisterRegexNoThrow(regex);
  throwExceptionIfError("Region::unregisterRegex", err);
}

void ThinClientRegion::unregisterAllKeys() {
  auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
  if (pool != nullptr) {
    if (!pool->getSubscriptionEnabled()) {
      LOGERROR(
          "Unregister all keys is supported only if "
          "subscription-enabled attribute is true for pool " +
          pool->getName());
      throw UnsupportedOperationException(
          "Unregister all keys is supported only if "
          "pool subscription-enabled attribute is true.");
    }
  }
  GfErrType err = unregisterRegexNoThrow(".*");
  throwExceptionIfError("Region::unregisterAllKeys", err);
}

std::shared_ptr<SelectResults> ThinClientRegion::query(
    const std::string& predicate, std::chrono::milliseconds timeout) {
  util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout);

  CHECK_DESTROY_PENDING(TryReadGuard, Region::query);

  if (predicate.empty()) {
    LOGERROR("Region query predicate string is empty");
    throw IllegalArgumentException("Region query predicate string is empty");
  }

  std::string squery;
  if (std::regex_search(predicate, PREDICATE_IS_FULL_QUERY_REGEX)) {
    squery = predicate;
  } else {
    squery = "select distinct * from ";
    squery += getFullPath();
    squery += " this where ";
    squery += predicate;
  }

  std::shared_ptr<RemoteQuery> queryPtr;

  if (auto poolDM = std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
    queryPtr = std::dynamic_pointer_cast<RemoteQuery>(
        poolDM->getQueryServiceWithoutCheck()->newQuery(squery.c_str()));
  } else {
    queryPtr = std::dynamic_pointer_cast<RemoteQuery>(
        m_cacheImpl->getQueryService()->newQuery(squery.c_str()));
  }

  return queryPtr->execute(timeout, "Region::query", m_tcrdm.get(), nullptr);
}

bool ThinClientRegion::existsValue(const std::string& predicate,
                                   std::chrono::milliseconds timeout) {
  util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout);

  auto results = query(predicate, timeout);

  if (results == nullptr) {
    return false;
  }

  return results->size() > 0;
}

GfErrType ThinClientRegion::unregisterKeysBeforeDestroyRegion() {
  auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
  if (pool != nullptr) {
    if (!pool->getSubscriptionEnabled()) {
      LOGDEBUG(
          "pool subscription-enabled attribute is false, No need to Unregister "
          "keys");
      return GF_NOERR;
    }
  }
  GfErrType err = GF_NOERR;
  GfErrType opErr = GF_NOERR;

  opErr = unregisterStoredRegexLocalDestroy(m_interestListRegex);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = unregisterStoredRegexLocalDestroy(m_durableInterestListRegex);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = unregisterStoredRegexLocalDestroy(
      m_interestListRegexForUpdatesAsInvalidates);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = unregisterStoredRegexLocalDestroy(
      m_durableInterestListRegexForUpdatesAsInvalidates);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVec;
  copyInterestList(keysVec, m_interestList);
  opErr = unregisterKeysNoThrowLocalDestroy(keysVec, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVecDurable;
  copyInterestList(keysVecDurable, m_durableInterestList);
  opErr = unregisterKeysNoThrowLocalDestroy(keysVecDurable, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates;
  copyInterestList(keysVecForUpdatesAsInvalidates,
                   m_interestListForUpdatesAsInvalidates);
  opErr =
      unregisterKeysNoThrowLocalDestroy(keysVecForUpdatesAsInvalidates, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>>
      keysVecDurableForUpdatesAsInvalidates;
  copyInterestList(keysVecDurableForUpdatesAsInvalidates,
                   m_durableInterestListForUpdatesAsInvalidates);
  opErr = unregisterKeysNoThrowLocalDestroy(
      keysVecDurableForUpdatesAsInvalidates, false);
  err = opErr != GF_NOERR ? opErr : err;
  return err;
}
std::shared_ptr<Serializable> ThinClientRegion::selectValue(
    const std::string& predicate, std::chrono::milliseconds timeout) {
  auto results = query(predicate, timeout);

  if (results == nullptr || results->size() == 0) {
    return nullptr;
  }

  if (results->size() > 1) {
    throw QueryException("selectValue has more than one result");
  }

  return results->operator[](0);
}

std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::serverKeys() {
  CHECK_DESTROY_PENDING(TryReadGuard, Region::serverKeys);

  TcrMessageReply reply(true, m_tcrdm.get());
  TcrMessageKeySet request(new DataOutput(m_cacheImpl->createDataOutput()),
                           m_fullPath, m_tcrdm.get());
  reply.setMessageTypeRequest(TcrMessage::KEY_SET);
  std::vector<std::shared_ptr<CacheableKey>> serverKeys;
  ChunkedKeySetResponse resultCollector(request, serverKeys, reply);
  reply.setChunkedResultHandler(&resultCollector);

  GfErrType err = GF_NOERR;

  err = m_tcrdm->sendSyncRequest(request, reply);

  throwExceptionIfError("Region::serverKeys", err);

  switch (reply.getMessageType()) {
    case TcrMessage::RESPONSE: {
      // keyset result is handled by ChunkedKeySetResponse
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region:serverKeys", reply.getException());
      break;
    }
    case TcrMessage::KEY_SET_DATA_ERROR: {
      LOGERROR("Region serverKeys: an error occurred on the server");
      err = GF_CACHESERVER_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d during region serverKeys",
               reply.getMessageType());
      err = GF_MSG;
      break;
    }
  }
  throwExceptionIfError("Region::serverKeys", err);

  return serverKeys;
}

bool ThinClientRegion::containsKeyOnServer(
    const std::shared_ptr<CacheableKey>& keyPtr) const {
  GfErrType err = GF_NOERR;
  bool ret = false;

  /** @brief Create message and send to bridge server */

  TcrMessageContainsKey request(
      new DataOutput(m_cacheImpl->createDataOutput()), this, keyPtr,
      static_cast<std::shared_ptr<Serializable>>(nullptr), true, m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
  err = m_tcrdm->sendSyncRequest(request, reply);

  switch (reply.getMessageType()) {
    case TcrMessage::RESPONSE:
      ret = reply.getBoolValue();
      break;

    case TcrMessage::EXCEPTION:
      err = handleServerException("Region::containsKeyOnServer:",
                                  reply.getException());
      break;

    case TcrMessage::REQUEST_DATA_ERROR:
      LOGERROR(
          "Region::containsKeyOnServer: read error occurred on the endpoint %s",
          m_tcrdm->getActiveEndpoint()->name().c_str());
      err = GF_CACHESERVER_EXCEPTION;
      break;

    default:
      LOGERROR("Unknown message type in Region::containsKeyOnServer %d",
               reply.getMessageType());
      err = GF_MSG;
      break;
  }

  auto rptr = CacheableBoolean::create(ret);

  rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr));
  throwExceptionIfError("Region::containsKeyOnServer ", err);
  return rptr->value();
}

bool ThinClientRegion::containsValueForKey_remote(
    const std::shared_ptr<CacheableKey>& keyPtr) const {
  GfErrType err = GF_NOERR;
  bool ret = false;

  /** @brief Create message and send to bridge server */

  TcrMessageContainsKey request(
      new DataOutput(m_cacheImpl->createDataOutput()), this, keyPtr,
      static_cast<std::shared_ptr<Serializable>>(nullptr), false,
      m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
  err = m_tcrdm->sendSyncRequest(request, reply);
  // if ( err != GF_NOERR ) return ret;

  switch (reply.getMessageType()) {
    case TcrMessage::RESPONSE:
      ret = reply.getBoolValue();
      break;

    case TcrMessage::EXCEPTION:
      err = handleServerException("Region::containsValueForKey:",
                                  reply.getException());
      break;

    case TcrMessage::REQUEST_DATA_ERROR:
      LOGERROR(
          "Region::containsValueForKey: read error occurred on the endpoint %s",
          m_tcrdm->getActiveEndpoint()->name().c_str());
      err = GF_CACHESERVER_EXCEPTION;
      break;

    default:
      LOGERROR("Unknown message type in Region::containsValueForKey %d",
               reply.getMessageType());
      err = GF_MSG;
      break;
  }

  auto rptr = CacheableBoolean::create(ret);

  rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr));

  throwExceptionIfError("Region::containsValueForKey ", err);
  return rptr->value();
}

void ThinClientRegion::clear(
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  GfErrType err = GF_NOERR;
  err = localClearNoThrow(aCallbackArgument, CacheEventFlags::NORMAL);
  if (err != GF_NOERR) throwExceptionIfError("Region::clear", err);

  /** @brief Create message and send to bridge server */

  TcrMessageClearRegion request(new DataOutput(m_cacheImpl->createDataOutput()),
                                this, aCallbackArgument,
                                std::chrono::milliseconds(-1), m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) throwExceptionIfError("Region::clear", err);

  switch (reply.getMessageType()) {
    case TcrMessage::REPLY:
      LOGFINE("Region %s clear message sent to server successfully",
              m_fullPath.c_str());
      break;
    case TcrMessage::EXCEPTION:
      err = handleServerException("Region::clear:", reply.getException());
      break;

    case TcrMessage::CLEAR_REGION_DATA_ERROR:
      LOGERROR("Region clear read error occurred on the endpoint %s",
               m_tcrdm->getActiveEndpoint()->name().c_str());
      err = GF_CACHESERVER_EXCEPTION;
      break;

    default:
      LOGERROR("Unknown message type %d during region clear",
               reply.getMessageType());
      err = GF_MSG;
      break;
  }
  if (err == GF_NOERR) {
    err = invokeCacheListenerForRegionEvent(
        aCallbackArgument, CacheEventFlags::NORMAL, AFTER_REGION_CLEAR);
  }
  throwExceptionIfError("Region::clear", err);
}

GfErrType ThinClientRegion::getNoThrow_remote(
    const std::shared_ptr<CacheableKey>& keyPtr,
    std::shared_ptr<Cacheable>& valPtr,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag>& versionTag) {
  GfErrType err = GF_NOERR;

  /** @brief Create message and send to bridge server */

  TcrMessageRequest request(new DataOutput(m_cacheImpl->createDataOutput()),
                            this, keyPtr, aCallbackArgument, m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) return err;

  // put the object into local region
  switch (reply.getMessageType()) {
    case TcrMessage::RESPONSE: {
      valPtr = reply.getValue();
      versionTag = reply.getVersionTag();
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region::get", reply.getException());
      break;
    }
    case TcrMessage::REQUEST_DATA_ERROR: {
      // LOGERROR("A read error occurred on the endpoint %s",
      //    m_tcrdm->getActiveEndpoint()->name().c_str());
      err = GF_CACHESERVER_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d while getting entry from region",
               reply.getMessageType());
      err = GF_MSG;
      break;
    }
  }
  return err;
}

GfErrType ThinClientRegion::invalidateNoThrow_remote(
    const std::shared_ptr<CacheableKey>& keyPtr,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag>& versionTag) {
  GfErrType err = GF_NOERR;

  TcrMessageInvalidate request(new DataOutput(m_cacheImpl->createDataOutput()),
                               this, keyPtr, aCallbackArgument, m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) return err;

  // put the object into local region
  switch (reply.getMessageType()) {
    case TcrMessage::REPLY: {
      versionTag = reply.getVersionTag();
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region::get", reply.getException());
      break;
    }
    case TcrMessage::INVALIDATE_ERROR: {
      err = GF_CACHESERVER_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d while getting entry from region",
               reply.getMessageType());
      err = GF_MSG;
      break;
    }
  }
  return err;
}

GfErrType ThinClientRegion::putNoThrow_remote(
    const std::shared_ptr<CacheableKey>& keyPtr,
    const std::shared_ptr<Cacheable>& valuePtr,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag>& versionTag, bool checkDelta) {
  GfErrType err = GF_NOERR;
  // do TCR put
  // bool delta = valuePtr->hasDelta();
  bool delta = false;
  auto&& conFlationValue = getCacheImpl()
                               ->getDistributedSystem()
                               .getSystemProperties()
                               .conflateEvents();
  if (checkDelta && valuePtr && conFlationValue != "true" &&
      ThinClientBaseDM::isDeltaEnabledOnServer()) {
    auto&& temp = std::dynamic_pointer_cast<Delta>(valuePtr);
    delta = temp && temp->hasDelta();
  }
  TcrMessagePut request(new DataOutput(m_cacheImpl->createDataOutput()), this,
                        keyPtr, valuePtr, aCallbackArgument, delta,
                        m_tcrdm.get());
  auto reply = std::unique_ptr<TcrMessageReply>(
      new TcrMessageReply(true, m_tcrdm.get()));
  err = m_tcrdm->sendSyncRequest(request, *reply);
  if (delta) {
    // Does not check whether success of failure..
    m_cacheImpl->getCachePerfStats().incDeltaPut();
    if (reply->getMessageType() == TcrMessage::PUT_DELTA_ERROR) {
      // Try without delta
      TcrMessagePut putRequest(new DataOutput(m_cacheImpl->createDataOutput()),
                               this, keyPtr, valuePtr, aCallbackArgument, false,
                               m_tcrdm.get(), false, true);
      reply = std::unique_ptr<TcrMessageReply>(
          new TcrMessageReply(true, m_tcrdm.get()));
      err = m_tcrdm->sendSyncRequest(putRequest, *reply);
    }
  }
  if (err != GF_NOERR) return err;

  // put the object into local region
  switch (reply->getMessageType()) {
    case TcrMessage::REPLY: {
      versionTag = reply->getVersionTag();
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region::put", reply->getException());
      break;
    }
    case TcrMessage::PUT_DATA_ERROR: {
      err = GF_CACHESERVER_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d during region put reply",
               reply->getMessageType());
      err = GF_MSG;
    }
  }
  return err;
}

GfErrType ThinClientRegion::createNoThrow_remote(
    const std::shared_ptr<CacheableKey>& keyPtr,
    const std::shared_ptr<Cacheable>& valuePtr,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag>& versionTag) {
  return putNoThrow_remote(keyPtr, valuePtr, aCallbackArgument, versionTag,
                           false);
}

GfErrType ThinClientRegion::destroyNoThrow_remote(
    const std::shared_ptr<CacheableKey>& keyPtr,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag>& versionTag) {
  GfErrType err = GF_NOERR;

  // do TCR destroy
  TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()),
                            this, keyPtr, nullptr, aCallbackArgument,
                            m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) return err;

  switch (reply.getMessageType()) {
    case TcrMessage::REPLY: {
      if (reply.getEntryNotFound() == 1) {
        err = GF_CACHE_ENTRY_NOT_FOUND;
      } else {
        LOGDEBUG("Remote key [%s] is destroyed successfully in region %s",
                 Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str());
      }
      versionTag = reply.getVersionTag();
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region::destroy", reply.getException());
      break;
    }
    case TcrMessage::DESTROY_DATA_ERROR: {
      err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d while destroying region entry",
               reply.getMessageType());
      err = GF_MSG;
    }
  }

  return err;
}  // destroyNoThrow_remote()

GfErrType ThinClientRegion::removeNoThrow_remote(
    const std::shared_ptr<CacheableKey>& keyPtr,
    const std::shared_ptr<Cacheable>& cvalue,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag>& versionTag) {
  GfErrType err = GF_NOERR;

  // do TCR remove
  TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()),
                            this, keyPtr, cvalue, aCallbackArgument,
                            m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) {
    return err;
  }
  switch (reply.getMessageType()) {
    case TcrMessage::REPLY: {
      if (reply.getEntryNotFound() == 1) {
        err = GF_ENOENT;
      } else {
        LOGDEBUG("Remote key [%s] is removed successfully in region %s",
                 Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str());
      }
      versionTag = reply.getVersionTag();
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region::remove", reply.getException());
      break;
    }
    case TcrMessage::DESTROY_DATA_ERROR: {
      err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d while removing region entry",
               reply.getMessageType());
      err = GF_MSG;
    }
  }
  return err;
}

GfErrType ThinClientRegion::removeNoThrowEX_remote(
    const std::shared_ptr<CacheableKey>& keyPtr,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag>& versionTag) {
  GfErrType err = GF_NOERR;

  // do TCR remove
  TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()),
                            this, keyPtr, nullptr, aCallbackArgument,
                            m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) {
    return err;
  }
  switch (reply.getMessageType()) {
    case TcrMessage::REPLY: {
      versionTag = reply.getVersionTag();
      if (reply.getEntryNotFound() == 1) {
        err = GF_ENOENT;
      } else {
        LOGDEBUG("Remote key [%s] is removed successfully in region %s",
                 Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str());
      }
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region::removeEx", reply.getException());
      break;
    }
    case TcrMessage::DESTROY_DATA_ERROR: {
      err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d while removing region entry",
               reply.getMessageType());
      err = GF_MSG;
    }
  }
  return err;
}

GfErrType ThinClientRegion::getAllNoThrow_remote(
    const std::vector<std::shared_ptr<CacheableKey>>* keys,
    const std::shared_ptr<HashMapOfCacheable>& values,
    const std::shared_ptr<HashMapOfException>& exceptions,
    const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>&
        resultKeys,
    bool addToLocalCache,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  GfErrType err = GF_NOERR;
  MapOfUpdateCounters updateCountMap;
  int32_t destroyTracker = 0;
  addToLocalCache = addToLocalCache && m_regionAttributes.getCachingEnabled();
  if (addToLocalCache && !m_regionAttributes.getConcurrencyChecksEnabled()) {
    // start tracking the entries
    if (keys == nullptr) {
      // track all entries with destroy tracking for non-existent entries
      destroyTracker = m_entries->addTrackerForAllEntries(updateCountMap, true);
    } else {
      for (const auto& key : *keys) {
        std::shared_ptr<Cacheable> oldValue;
        int updateCount =
            m_entries->addTrackerForEntry(key, oldValue, true, false, false);
        updateCountMap.emplace(key, updateCount);
      }
    }
  }
  // create the GET_ALL request
  TcrMessageGetAll request(new DataOutput(m_cacheImpl->createDataOutput()),
                           this, keys, m_tcrdm.get(), aCallbackArgument);

  TcrMessageReply reply(true, m_tcrdm.get());
  std::recursive_mutex responseLock;
  // need to check
  TcrChunkedResult* resultCollector(new ChunkedGetAllResponse(
      reply, this, keys, values, exceptions, resultKeys, updateCountMap,
      destroyTracker, addToLocalCache, responseLock));

  reply.setChunkedResultHandler(resultCollector);
  err = m_tcrdm->sendSyncRequest(request, reply);

  if (addToLocalCache && !m_regionAttributes.getConcurrencyChecksEnabled()) {
    // remove the tracking for remaining keys in case some keys do not have
    // values from server in GII
    for (MapOfUpdateCounters::const_iterator iter = updateCountMap.begin();
         iter != updateCountMap.end(); ++iter) {
      if (iter->second >= 0) {
        m_entries->removeTrackerForEntry(iter->first);
      }
    }
    // remove tracking for destroys
    if (destroyTracker > 0) {
      m_entries->removeDestroyTracking();
    }
  }
  delete resultCollector;
  if (err != GF_NOERR) {
    return err;
  }

  switch (reply.getMessageType()) {
    case TcrMessage::RESPONSE: {
      // nothing to be done; put in local region, if required,
      // is handled by ChunkedGetAllResponse
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region:getAll", reply.getException());
      break;
    }
    case TcrMessage::GET_ALL_DATA_ERROR: {
      LOGERROR("Region get-all: a read error occurred on the endpoint %s",
               m_tcrdm->getActiveEndpoint()->name().c_str());
      err = GF_CACHESERVER_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d during region get-all",
               reply.getMessageType());
      err = GF_MSG;
      break;
    }
  }
  return err;
}

GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote(
    ThinClientPoolDM* tcrdm, const HashMapOfCacheable& map,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    std::chrono::milliseconds timeout,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  LOGDEBUG(" ThinClientRegion::singleHopPutAllNoThrow_remote map size = %zu",
           map.size());
  auto region = shared_from_this();

  auto error = GF_NOERR;
  /*Step-1::
   * populate the keys vector from the user Map and pass it to the
   * getServerToFilterMap to generate locationMap
   * If locationMap is nullptr try the old, existing putAll impl that may take
   * multiple n/w hops
   */
  auto userKeys = std::vector<std::shared_ptr<CacheableKey>>();
  for (const auto& iter : map) {
    userKeys.push_back(iter.first);
  }
  // last param in getServerToFilterMap() is false for putAll

  // LOGDEBUG("ThinClientRegion::singleHopPutAllNoThrow_remote keys.size() = %d
  // ", userKeys->size());
  auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
      userKeys, region, true);
  if (!locationMap) {
    // putAll with multiple hop implementation
    LOGDEBUG("locationMap is Null or Empty");

    return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
                                        aCallbackArgument);
  }

  // set this flag that indicates putAll on PR is invoked with singlehop
  // enabled.
  m_isPRSingleHopEnabled = true;
  // LOGDEBUG("locationMap.size() = %d ", locationMap->size());

  /*Step-2
   *  a. create vector of PutAllWork
   *  b. locationMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
   * server specific filteredMap/subMap by populating all keys
   * (locationIter.second()) and its corr. values from the user Map.
   *  c. create new instance of PutAllWork, i.e worker with required params.
   *     //TODO:: Add details of each parameter later
   *  d. enqueue the worker for thread from threadPool to perform/run execute
   * method.
   *  e. insert the worker into the vector.
   */
  std::vector<std::shared_ptr<PutAllWork>> putAllWorkers;
  auto& threadPool = m_cacheImpl->getThreadPool();
  int locationMapIndex = 0;
  for (const auto& locationIter : *locationMap) {
    const auto& serverLocation = locationIter.first;
    if (serverLocation == nullptr) {
      LOGDEBUG("serverLocation is nullptr");
    }
    const auto& keys = locationIter.second;

    // Create server specific Sub-Map by iterating over keys.
    auto filteredMap = std::make_shared<HashMapOfCacheable>();
    if (keys != nullptr && keys->size() > 0) {
      for (const auto& key : *keys) {
        const auto& iter = map.find(key);
        if (iter != map.end()) {
          filteredMap->emplace(iter->first, iter->second);
        }
      }
    }

    auto worker = std::make_shared<PutAllWork>(
        tcrdm, serverLocation, region, true /*attemptFailover*/,
        false /*isBGThread*/, filteredMap, keys, timeout, aCallbackArgument);
    threadPool.perform(worker);
    putAllWorkers.push_back(worker);
    locationMapIndex++;
  }

  // TODO::CHECK, do we need to set following ..??
  // reply.setMessageType(TcrMessage::RESPONSE);

  int cnt = 1;

  /**
   * Step::3
   * a. Iterate over all vector of putAllWorkers and populate worker specific
   * information into the HashMap
   *    resultMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<Serializable>>, 2nd part, Value can be a
   * std::shared_ptr<VersionedCacheableObjectPartList> or
   * std::shared_ptr<PutAllPartialResultServerException>.
   *    failedServers<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
   * the worker
   */
  auto resultMap = ResultMap();
  auto failedServers = FailedServersMap();

  for (const auto& worker : putAllWorkers) {
    auto err =
        worker->getResult();  // wait() or blocking call for worker thread.
    LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);

    if (GF_NOERR == err) {
      // No Exception from server
      resultMap.emplace(worker->getServerLocation(),
                        worker->getResultCollector()->getList());
    } else {
      error = err;

      if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
        resultMap.emplace(worker->getServerLocation(),
                          worker->getPaPResultException());
      } else if (error == GF_NOTCON) {
        // Refresh the metadata in case of GF_NOTCON.
        tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
            region->getFullPath(), 0);
      }
      failedServers.emplace(worker->getServerLocation(),
                            CacheableInt32::create(error));
    }

    LOGDEBUG("worker->getPutAllMap()->size() = %zu ",
             worker->getPutAllMap()->size());
    LOGDEBUG(
        "worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
        worker->getResultCollector()->getList()->getVersionedTagsize());

    // TODO::CHECK, why do we need following code... ??
    // TcrMessage* currentReply = worker->getReply();
    /*
    if(currentReply->getMessageType() != TcrMessage::REPLY)
    {
      reply.setMessageType(currentReply->getMessageType());
    }
    */

    cnt++;
  }
  /**
   * Step:4
   * a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
   * map.size() b. Iterate over the resultMap and value for the particular
   * serverlocation is of type VersionedCacheableObjectPartList add keys and
   * versions. C. ToDO:: what if the value in the resultMap is of type
   * PutAllPartialResultServerException
   */
  std::recursive_mutex responseLock;
  auto result = std::make_shared<PutAllPartialResult>(
      static_cast<int>(map.size()), responseLock);
  LOGDEBUG(
      " TCRegion:: %s:%d  "
      "result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
      __FILE__, __LINE__,
      result->getSucceededKeysAndVersions()->getVersionedTagsize());
  LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
           resultMap.size());
  for (const auto& resultMapIter : resultMap) {
    const auto& value = resultMapIter.second;

    if (const auto papException =
            std::dynamic_pointer_cast<PutAllPartialResultServerException>(
                value)) {
      // PutAllPartialResultServerException CASE:: value in resultMap is of type
      // PutAllPartialResultServerException.
      // TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
      // keys in map failedServers,
      //       that is set view of the keys contained in failedservers map.
      // TODO:: need to read  papException and populate PutAllPartialResult.
      result->consolidate(papException->getResult());
    } else if (const auto list =
                   std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
                       value)) {
      // value in resultMap is of type VCOPL.
      result->addKeysAndVersions(list);
    } else {
      // ERROR CASE
      if (value) {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value "
            "could not Cast to either VCOPL or "
            "PutAllPartialResultServerException:%s",
            value->toString().c_str());
      } else {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value is "
            "nullptr");
      }
    }
  }

  /**
   * a. if PutAllPartialResult result does not contains any entry,  Iterate over
   * locationMap.
   * b. Create std::vector<std::shared_ptr<CacheableKey>>  succeedKeySet, and
   * keep adding set of keys (locationIter.second()) in locationMap for which
   * failedServers->contains(locationIter.first()is false.
   */

  LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__,
           __LINE__, failedServers.size());

  // if the partial result set doesn't already have keys (for tracking version
  // tags)
  // then we need to gather up the keys that we know have succeeded so far and
  // add them to the partial result set (See bug Id #955)
  if (!failedServers.empty()) {
    auto succeedKeySet =
        std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
    if (result->getSucceededKeysAndVersions()->size() == 0) {
      for (const auto& locationIter : *locationMap) {
        if (failedServers.find(locationIter.first) != failedServers.end()) {
          for (const auto& i : *(locationIter.second)) {
            succeedKeySet->push_back(i);
          }
        }
      }
      result->addKeys(succeedKeySet);
    }
  }

  /**
   * a. Iterate over the failedServers map
   * c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
   * continue, Do not retry putAll for corr. keys.
   * b. Retry for all the failed server.
   *    Generate a newSubMap by finding Keys specific to failedServers from
   * locationMap and finding their respective values from the usermap.
   */
  error = GF_NOERR;
  bool oneSubMapRetryFailed = false;
  for (const auto& failedServerIter : failedServers) {
    if (failedServerIter.second->value() ==
        GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {  // serverLocation
      // will not retry for PutAllPartialResultException
      // but it means at least one sub map ever failed
      oneSubMapRetryFailed = true;
      error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
      continue;
    }

    std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
        nullptr;
    const auto& failedSerInLocMapIter =
        locationMap->find(failedServerIter.first);
    if (failedSerInLocMapIter != locationMap->end()) {
      failedKeys = failedSerInLocMapIter->second;
    }

    if (failedKeys == nullptr) {
      LOGERROR(
          "TCRegion::singleHopPutAllNoThrow_remote :: failedKeys are nullptr "
          "that is not valid");
    }

    auto newSubMap = std::make_shared<HashMapOfCacheable>();
    if (failedKeys && !failedKeys->empty()) {
      for (const auto& key : *failedKeys) {
        const auto& iter = map.find(key);
        if (iter != map.end()) {
          newSubMap->emplace(iter->first, iter->second);
        } else {
          LOGERROR(
              "DEBUG:: TCRegion.cpp singleHopPutAllNoThrow_remote KEY not "
              "found in user failedSubMap");
        }
      }
    }

    std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
    GfErrType errCode = multiHopPutAllNoThrow_remote(
        *newSubMap, vcopListPtr, timeout, aCallbackArgument);
    if (errCode == GF_NOERR) {
      result->addKeysAndVersions(vcopListPtr);
    } else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
      oneSubMapRetryFailed = true;
      // TODO:: Commented it as papResultServerExc is nullptr this time
      //       UnComment it once you read papResultServerExc.
      // result->consolidate(papResultServerExc->getResult());
      error = errCode;
    } else /*if(errCode != GF_NOERR)*/ {
      oneSubMapRetryFailed = true;
      const auto& firstKey = newSubMap->begin()->first;
      std::shared_ptr<Exception> excptPtr = nullptr;
      // TODO:: formulat excptPtr from the errCode
      result->saveFailedKey(firstKey, excptPtr);
      error = errCode;
    }
  }

  if (!oneSubMapRetryFailed) {
    error = GF_NOERR;
  }
  versionedObjPartList = result->getSucceededKeysAndVersions();
  LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
           versionedObjPartList->size(), error);

  return error;
}

GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote(
    const HashMapOfCacheable& map,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    std::chrono::milliseconds timeout,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  // Multiple hop implementation
  LOGDEBUG("ThinClientRegion::multiHopPutAllNoThrow_remote ");
  auto err = GF_NOERR;

  // Construct request/reply for putAll
  TcrMessagePutAll request(new DataOutput(m_cacheImpl->createDataOutput()),
                           this, map, timeout, m_tcrdm.get(),
                           aCallbackArgument);
  TcrMessageReply reply(true, m_tcrdm.get());
  request.setTimeout(timeout);
  reply.setTimeout(timeout);

  std::recursive_mutex responseLock;
  versionedObjPartList =
      std::make_shared<VersionedCacheableObjectPartList>(this, responseLock);
  // need to check
  ChunkedPutAllResponse* resultCollector(new ChunkedPutAllResponse(
      shared_from_this(), reply, responseLock, versionedObjPartList));
  reply.setChunkedResultHandler(resultCollector);

  err = m_tcrdm->sendSyncRequest(request, reply);

  versionedObjPartList = resultCollector->getList();
  LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d  ",
           versionedObjPartList->size(), err);
  delete resultCollector;
  if (err != GF_NOERR) return err;
  LOGDEBUG(
      "ThinClientRegion::multiHopPutAllNoThrow_remote reply.getMessageType() = "
      "%d ",
      reply.getMessageType());
  switch (reply.getMessageType()) {
    case TcrMessage::REPLY:
      // LOGDEBUG("Map is written into remote server at region %s",
      // m_fullPath.c_str());
      break;
    case TcrMessage::RESPONSE:
      LOGDEBUG(
          "multiHopPutAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ",
          m_fullPath.c_str(), err);
      break;
    case TcrMessage::EXCEPTION:
      err = handleServerException("ThinClientRegion::putAllNoThrow",
                                  reply.getException());
      // TODO:: Do we need to read PutAllPartialServerException for multiple
      // hop.
      break;
    case TcrMessage::PUT_DATA_ERROR:
      // LOGERROR( "A write error occurred on the endpoint %s",
      // m_tcrdm->getActiveEndpoint( )->name( ).c_str( ) );
      err = GF_CACHESERVER_EXCEPTION;
      break;
    default:
      LOGERROR("Unknown message type %d during region put-all",
               reply.getMessageType());
      err = GF_NOTOBJ;
      break;
  }
  return err;
}

GfErrType ThinClientRegion::putAllNoThrow_remote(
    const HashMapOfCacheable& map,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    std::chrono::milliseconds timeout,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  LOGDEBUG("ThinClientRegion::putAllNoThrow_remote");

  if (auto poolDM = std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
    if (poolDM->getPRSingleHopEnabled() && poolDM->getClientMetaDataService() &&
        !TSSTXStateWrapper::get().getTXState()) {
      return singleHopPutAllNoThrow_remote(
          poolDM.get(), map, versionedObjPartList, timeout, aCallbackArgument);
    } else {
      return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
                                          aCallbackArgument);
    }
  } else {
    LOGERROR("ThinClientRegion::putAllNoThrow_remote :: Pool Not Specified ");
    return GF_NOTSUP;
  }
}

GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote(
    ThinClientPoolDM* tcrdm,
    const std::vector<std::shared_ptr<CacheableKey>>& keys,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  LOGDEBUG(
      " ThinClientRegion::singleHopRemoveAllNoThrow_remote keys size = %zu",
      keys.size());
  auto region = shared_from_this();
  GfErrType error = GF_NOERR;

  auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
      keys, region, true);
  if (!locationMap) {
    // removeAll with multiple hop implementation
    LOGDEBUG("locationMap is Null or Empty");
    return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
                                           aCallbackArgument);
  }

  // set this flag that indicates putAll on PR is invoked with singlehop
  // enabled.
  m_isPRSingleHopEnabled = true;
  LOGDEBUG("locationMap.size() = %zu ", locationMap->size());

  /*Step-2
   *  a. create vector of RemoveAllWork
   *  b. locationMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
   * server specific filteredMap/subMap by populating all keys
   * (locationIter.second()) and its corr. values from the user Map.
   *  c. create new instance of RemoveAllWork, i.e worker with required params.
   *     //TODO:: Add details of each parameter later
   *  d. enqueue the worker for thread from threadPool to perform/run execute
   * method.
   *  e. insert the worker into the vector.
   */
  std::vector<std::shared_ptr<RemoveAllWork>> removeAllWorkers;
  auto& threadPool = m_cacheImpl->getThreadPool();
  int locationMapIndex = 0;
  for (const auto& locationIter : *locationMap) {
    const auto& serverLocation = locationIter.first;
    if (serverLocation == nullptr) {
      LOGDEBUG("serverLocation is nullptr");
    }
    const auto& mappedkeys = locationIter.second;
    auto worker = std::make_shared<RemoveAllWork>(
        tcrdm, serverLocation, region, true /*attemptFailover*/,
        false /*isBGThread*/, mappedkeys, aCallbackArgument);
    threadPool.perform(worker);
    removeAllWorkers.push_back(worker);
    locationMapIndex++;
  }
  // TODO::CHECK, do we need to set following ..??
  // reply.setMessageType(TcrMessage::RESPONSE);

  int cnt = 1;

  /**
   * Step::3
   * a. Iterate over all vector of putAllWorkers and populate worker specific
   * information into the HashMap
   *    resultMap<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<Serializable>>, 2nd part, Value can be a
   * std::shared_ptr<VersionedCacheableObjectPartList> or
   * std::shared_ptr<PutAllPartialResultServerException>.
   *    failedServers<std::shared_ptr<BucketServerLocation>,
   * std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
   * the worker
   */
  auto resultMap = ResultMap();
  auto failedServers = FailedServersMap();
  for (const auto& worker : removeAllWorkers) {
    auto err =
        worker->getResult();  // wait() or blocking call for worker thread.
    LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);

    if (GF_NOERR == err) {
      // No Exception from server
      resultMap.emplace(worker->getServerLocation(),
                        worker->getResultCollector()->getList());
    } else {
      error = err;

      if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
        resultMap.emplace(worker->getServerLocation(),
                          worker->getPaPResultException());
      } else if (error == GF_NOTCON) {
        // Refresh the metadata in case of GF_NOTCON.
        tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
            region->getFullPath(), 0);
      }
      failedServers.emplace(worker->getServerLocation(),
                            CacheableInt32::create(error));
    }

    LOGDEBUG(
        "worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
        worker->getResultCollector()->getList()->getVersionedTagsize());

    cnt++;
  }
  /**
   * Step:4
   * a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
   * map.size() b. Iterate over the resultMap and value for the particular
   * serverlocation is of type VersionedCacheableObjectPartList add keys and
   * versions. C. ToDO:: what if the value in the resultMap is of type
   * PutAllPartialResultServerException
   */
  std::recursive_mutex responseLock;
  auto result = std::make_shared<PutAllPartialResult>(
      static_cast<int>(keys.size()), responseLock);
  LOGDEBUG(
      " TCRegion:: %s:%d  "
      "result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
      __FILE__, __LINE__,
      result->getSucceededKeysAndVersions()->getVersionedTagsize());
  LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
           resultMap.size());
  for (const auto& resultMapIter : resultMap) {
    const auto& value = resultMapIter.second;

    if (const auto papException =
            std::dynamic_pointer_cast<PutAllPartialResultServerException>(
                value)) {
      // PutAllPartialResultServerException CASE:: value in resultMap is of type
      // PutAllPartialResultServerException.
      // TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
      // keys in map failedServers,
      //       that is set view of the keys contained in failedservers map.
      // TODO:: need to read  papException and populate PutAllPartialResult.
      result->consolidate(papException->getResult());
    } else if (const auto list =
                   std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
                       value)) {
      // value in resultMap is of type VCOPL.
      result->addKeysAndVersions(list);
    } else {
      // ERROR CASE
      if (value) {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
            "could not Cast to either VCOPL or "
            "PutAllPartialResultServerException:%s",
            value->toString().c_str());
      } else {
        LOGERROR(
            "ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
            "is nullptr");
      }
    }
  }

  /**
   * a. if PutAllPartialResult result does not contains any entry,  Iterate over
   * locationMap.
   * b. Create std::vector<std::shared_ptr<CacheableKey>>  succeedKeySet, and
   * keep adding set of keys (locationIter.second()) in locationMap for which
   * failedServers->contains(locationIter.first()is false.
   */

  LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__,
           __LINE__, failedServers.size());

  // if the partial result set doesn't already have keys (for tracking version
  // tags)
  // then we need to gather up the keys that we know have succeeded so far and
  // add them to the partial result set (See bug Id #955)
  if (!failedServers.empty()) {
    auto succeedKeySet =
        std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
    if (result->getSucceededKeysAndVersions()->size() == 0) {
      for (const auto& locationIter : *locationMap) {
        if (failedServers.find(locationIter.first) != failedServers.end()) {
          for (const auto& i : *(locationIter.second)) {
            succeedKeySet->push_back(i);
          }
        }
      }
      result->addKeys(succeedKeySet);
    }
  }

  /**
   * a. Iterate over the failedServers map
   * c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
   * continue, Do not retry putAll for corr. keys.
   * b. Retry for all the failed server.
   *    Generate a newSubMap by finding Keys specific to failedServers from
   * locationMap and finding their respective values from the usermap.
   */
  error = GF_NOERR;
  bool oneSubMapRetryFailed = false;
  for (const auto& failedServerIter : failedServers) {
    if (failedServerIter.second->value() ==
        GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {  // serverLocation
      // will not retry for PutAllPartialResultException
      // but it means at least one sub map ever failed
      oneSubMapRetryFailed = true;
      error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
      continue;
    }

    std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
        nullptr;
    const auto& failedSerInLocMapIter =
        locationMap->find(failedServerIter.first);
    if (failedSerInLocMapIter != locationMap->end()) {
      failedKeys = failedSerInLocMapIter->second;
    }

    if (failedKeys == nullptr) {
      LOGERROR(
          "TCRegion::singleHopRemoveAllNoThrow_remote :: failedKeys are "
          "nullptr "
          "that is not valid");
    }

    std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
    std::shared_ptr<PutAllPartialResultServerException> papResultServerExc =
        nullptr;
    GfErrType errCode = multiHopRemoveAllNoThrow_remote(
        *failedKeys, vcopListPtr, aCallbackArgument);
    if (errCode == GF_NOERR) {
      result->addKeysAndVersions(vcopListPtr);
    } else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
      oneSubMapRetryFailed = true;
      error = errCode;
    } else /*if(errCode != GF_NOERR)*/ {
      oneSubMapRetryFailed = true;
      std::shared_ptr<Exception> excptPtr = nullptr;
      result->saveFailedKey(failedKeys->at(0), excptPtr);
      error = errCode;
    }
  }

  if (!oneSubMapRetryFailed) {
    error = GF_NOERR;
  }
  versionedObjPartList = result->getSucceededKeysAndVersions();
  LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
           versionedObjPartList->size(), error);
  return error;
}

GfErrType ThinClientRegion::multiHopRemoveAllNoThrow_remote(
    const std::vector<std::shared_ptr<CacheableKey>>& keys,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  // Multiple hop implementation
  LOGDEBUG("ThinClientRegion::multiHopRemoveAllNoThrow_remote ");
  GfErrType err = GF_NOERR;

  // Construct request/reply for putAll
  TcrMessageRemoveAll request(new DataOutput(m_cacheImpl->createDataOutput()),
                              this, keys, aCallbackArgument, m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());

  std::recursive_mutex responseLock;
  versionedObjPartList =
      std::make_shared<VersionedCacheableObjectPartList>(this, responseLock);
  // need to check
  ChunkedRemoveAllResponse* resultCollector(new ChunkedRemoveAllResponse(
      shared_from_this(), reply, responseLock, versionedObjPartList));
  reply.setChunkedResultHandler(resultCollector);

  err = m_tcrdm->sendSyncRequest(request, reply);

  versionedObjPartList = resultCollector->getList();
  LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d  ",
           versionedObjPartList->size(), err);
  delete resultCollector;
  if (err != GF_NOERR) return err;
  LOGDEBUG(
      "ThinClientRegion::multiHopRemoveAllNoThrow_remote "
      "reply.getMessageType() = %d ",
      reply.getMessageType());
  switch (reply.getMessageType()) {
    case TcrMessage::REPLY:
      // LOGDEBUG("Map is written into remote server at region %s",
      // m_fullPath.c_str());
      break;
    case TcrMessage::RESPONSE:
      LOGDEBUG(
          "multiHopRemoveAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ",
          m_fullPath.c_str(), err);
      break;
    case TcrMessage::EXCEPTION:
      err = handleServerException("ThinClientRegion::putAllNoThrow",
                                  reply.getException());
      break;
    default:
      LOGERROR("Unknown message type %d during region remove-all",
               reply.getMessageType());
      err = GF_NOTOBJ;
      break;
  }
  return err;
}

GfErrType ThinClientRegion::removeAllNoThrow_remote(
    const std::vector<std::shared_ptr<CacheableKey>>& keys,
    std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  LOGDEBUG("ThinClientRegion::removeAllNoThrow_remote");

  if (auto poolDM = std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
    if (poolDM->getPRSingleHopEnabled() && poolDM->getClientMetaDataService() &&
        !TSSTXStateWrapper::get().getTXState()) {
      return singleHopRemoveAllNoThrow_remote(
          poolDM.get(), keys, versionedObjPartList, aCallbackArgument);
    } else {
      return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
                                             aCallbackArgument);
    }
  } else {
    LOGERROR(
        "ThinClientRegion::removeAllNoThrow_remote :: Pool Not Specified ");
    return GF_NOTSUP;
  }
}

uint32_t ThinClientRegion::size_remote() {
  LOGDEBUG("ThinClientRegion::size_remote");
  GfErrType err = GF_NOERR;

  // do TCR size
  TcrMessageSize request(new DataOutput(m_cacheImpl->createDataOutput()),
                         m_fullPath.c_str());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);

  if (err != GF_NOERR) {
    throwExceptionIfError("Region::size", err);
  }

  switch (reply.getMessageType()) {
    case TcrMessage::RESPONSE: {
      auto size = std::dynamic_pointer_cast<CacheableInt32>(reply.getValue());
      return size->value();
    }
    case TcrMessage::EXCEPTION:
      err =
          handleServerException("ThinClientRegion::size", reply.getException());
      break;
    case TcrMessage::SIZE_ERROR:
      err = GF_CACHESERVER_EXCEPTION;
      break;
    default:
      LOGERROR("Unknown message type %d during region size",
               reply.getMessageType());
      err = GF_NOTOBJ;
  }

  throwExceptionIfError("Region::size", err);
  return 0;
}

GfErrType ThinClientRegion::registerStoredRegex(
    TcrEndpoint* endpoint,
    std::unordered_map<std::string, InterestResultPolicy>& interestListRegex,
    bool isDurable, bool receiveValues) {
  GfErrType opErr = GF_NOERR;
  GfErrType retVal = GF_NOERR;

  for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
           interestListRegex.begin();
       it != interestListRegex.end(); ++it) {
    opErr = registerRegexNoThrow(it->first, false, endpoint, isDurable, nullptr,
                                 it->second, receiveValues);
    if (opErr != GF_NOERR) {
      retVal = opErr;
    }
  }

  return retVal;
}

GfErrType ThinClientRegion::registerKeys(TcrEndpoint* endpoint,
                                         const TcrMessage* request,
                                         TcrMessageReply* reply) {
  GfErrType err = GF_NOERR;
  GfErrType opErr = GF_NOERR;

  // called when failover to a different server
  opErr = registerStoredRegex(endpoint, m_interestListRegex);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = registerStoredRegex(
      endpoint, m_interestListRegexForUpdatesAsInvalidates, false, false);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = registerStoredRegex(endpoint, m_durableInterestListRegex, true);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = registerStoredRegex(
      endpoint, m_durableInterestListRegexForUpdatesAsInvalidates, true, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVec;
  InterestResultPolicy interestPolicy =
      copyInterestList(keysVec, m_interestList);
  opErr = registerKeysNoThrow(keysVec, false, endpoint, false, interestPolicy);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates;
  interestPolicy = copyInterestList(keysVecForUpdatesAsInvalidates,
                                    m_interestListForUpdatesAsInvalidates);
  opErr = registerKeysNoThrow(keysVecForUpdatesAsInvalidates, false, endpoint,
                              false, interestPolicy, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVecDurable;
  interestPolicy = copyInterestList(keysVecDurable, m_durableInterestList);
  opErr = registerKeysNoThrow(keysVecDurable, false, endpoint, true,
                              interestPolicy);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>>
      keysVecDurableForUpdatesAsInvalidates;
  interestPolicy =
      copyInterestList(keysVecDurableForUpdatesAsInvalidates,
                       m_durableInterestListForUpdatesAsInvalidates);
  opErr = registerKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false,
                              endpoint, true, interestPolicy, false);
  err = opErr != GF_NOERR ? opErr : err;

  if (request != nullptr && request->getRegionName() == m_fullPath &&
      (request->getMessageType() == TcrMessage::REGISTER_INTEREST ||
       request->getMessageType() == TcrMessage::REGISTER_INTEREST_LIST)) {
    const std::vector<std::shared_ptr<CacheableKey>>* newKeysVec =
        request->getKeys();
    bool isDurable = request->isDurable();
    bool receiveValues = request->receiveValues();
    if (newKeysVec == nullptr || newKeysVec->empty()) {
      const std::string& newRegex = request->getRegex();
      if (!newRegex.empty()) {
        if (request->getRegionName() != m_fullPath) {
          reply = nullptr;
        }
        opErr = registerRegexNoThrow(
            newRegex, false, endpoint, isDurable, nullptr,
            request->getInterestResultPolicy(), receiveValues, reply);
        err = opErr != GF_NOERR ? opErr : err;
      }
    } else {
      opErr = registerKeysNoThrow(*newKeysVec, false, endpoint, isDurable,
                                  request->getInterestResultPolicy(),
                                  receiveValues, reply);
      err = opErr != GF_NOERR ? opErr : err;
    }
  }
  return err;
}

GfErrType ThinClientRegion::unregisterStoredRegex(
    std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) {
  GfErrType opErr = GF_NOERR;
  GfErrType retVal = GF_NOERR;

  for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
           interestListRegex.begin();
       it != interestListRegex.end(); ++it) {
    opErr = unregisterRegexNoThrow(it->first, false);
    if (opErr != GF_NOERR) {
      retVal = opErr;
    }
  }

  return retVal;
}

GfErrType ThinClientRegion::unregisterStoredRegexLocalDestroy(
    std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) {
  GfErrType opErr = GF_NOERR;
  GfErrType retVal = GF_NOERR;

  for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
           interestListRegex.begin();
       it != interestListRegex.end(); ++it) {
    opErr = unregisterRegexNoThrowLocalDestroy(it->first, false);
    if (opErr != GF_NOERR) {
      retVal = opErr;
    }
  }
  return retVal;
}

GfErrType ThinClientRegion::unregisterKeys() {
  GfErrType err = GF_NOERR;
  GfErrType opErr = GF_NOERR;

  // called when disconnect from a server
  opErr = unregisterStoredRegex(m_interestListRegex);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = unregisterStoredRegex(m_durableInterestListRegex);
  err = opErr != GF_NOERR ? opErr : err;
  opErr = unregisterStoredRegex(m_interestListRegexForUpdatesAsInvalidates);
  err = opErr != GF_NOERR ? opErr : err;
  opErr =
      unregisterStoredRegex(m_durableInterestListRegexForUpdatesAsInvalidates);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVec;
  copyInterestList(keysVec, m_interestList);
  opErr = unregisterKeysNoThrow(keysVec, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVecDurable;
  copyInterestList(keysVecDurable, m_durableInterestList);
  opErr = unregisterKeysNoThrow(keysVecDurable, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates;
  copyInterestList(keysVecForUpdatesAsInvalidates,
                   m_interestListForUpdatesAsInvalidates);
  opErr = unregisterKeysNoThrow(keysVecForUpdatesAsInvalidates, false);
  err = opErr != GF_NOERR ? opErr : err;

  std::vector<std::shared_ptr<CacheableKey>>
      keysVecDurableForUpdatesAsInvalidates;
  copyInterestList(keysVecDurableForUpdatesAsInvalidates,
                   m_durableInterestListForUpdatesAsInvalidates);
  opErr = unregisterKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false);
  err = opErr != GF_NOERR ? opErr : err;

  return err;
}

GfErrType ThinClientRegion::destroyRegionNoThrow_remote(
    const std::shared_ptr<Serializable>& aCallbackArgument) {
  GfErrType err = GF_NOERR;

  // do TCR destroyRegion
  TcrMessageDestroyRegion request(
      new DataOutput(m_cacheImpl->createDataOutput()), this, aCallbackArgument,
      std::chrono::milliseconds(-1), m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) return err;

  switch (reply.getMessageType()) {
    case TcrMessage::REPLY: {
      // LOGINFO("Region %s at remote is destroyed successfully",
      // m_fullPath.c_str());
      break;
    }
    case TcrMessage::EXCEPTION: {
      err =
          handleServerException("Region::destroyRegion", reply.getException());
      break;
    }
    case TcrMessage::DESTROY_REGION_DATA_ERROR: {
      err = GF_CACHE_REGION_DESTROYED_EXCEPTION;
      break;
    }
    default: {
      LOGERROR("Unknown message type %d during destroy region",
               reply.getMessageType());
      err = GF_MSG;
    }
  }
  return err;
}

GfErrType ThinClientRegion::registerKeysNoThrow(
    const std::vector<std::shared_ptr<CacheableKey>>& keys,
    bool attemptFailover, TcrEndpoint* endpoint, bool isDurable,
    InterestResultPolicy interestPolicy, bool receiveValues,
    TcrMessageReply* reply) {
  RegionGlobalLocks acquireLocksRedundancy(this, false);
  RegionGlobalLocks acquireLocksFailover(this);
  CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
  GfErrType err = GF_NOERR;

  std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
  if (keys.empty()) {
    return err;
  }

  TcrMessageReply replyLocal(true, m_tcrdm.get());
  bool needToCreateRC = true;
  if (reply == nullptr) {
    reply = &replyLocal;
  } else {
    needToCreateRC = false;
  }

  LOGDEBUG("ThinClientRegion::registerKeysNoThrow : interestpolicy is %d",
           interestPolicy.ordinal);

  TcrMessageRegisterInterestList request(
      new DataOutput(m_cacheImpl->createDataOutput()), this, keys, isDurable,
      getAttributes().getCachingEnabled(), receiveValues, interestPolicy,
      m_tcrdm.get());
  std::recursive_mutex responseLock;
  TcrChunkedResult* resultCollector = nullptr;
  if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) {
    auto values = std::make_shared<HashMapOfCacheable>();
    auto exceptions = std::make_shared<HashMapOfException>();
    MapOfUpdateCounters trackers;
    int32_t destroyTracker = 1;
    if (needToCreateRC) {
      resultCollector = (new ChunkedGetAllResponse(
          request, this, &keys, values, exceptions, nullptr, trackers,
          destroyTracker, true, responseLock));
      reply->setChunkedResultHandler(resultCollector);
    }
  } else {
    if (needToCreateRC) {
      resultCollector = (new ChunkedInterestResponse(request, nullptr, *reply));
      reply->setChunkedResultHandler(resultCollector);
    }
  }

  err = m_tcrdm->sendSyncRequestRegisterInterest(
      request, *reply, attemptFailover, this, endpoint);

  if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
    if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY &&
        endpoint) {
      LOGFINER(
          "registerKeysNoThrow - got response from secondary for "
          "endpoint %s, ignoring.",
          endpoint->name().c_str());
    } else if (attemptFailover) {
      addKeys(keys, isDurable, receiveValues, interestPolicy);
      if (!(interestPolicy.ordinal ==
            InterestResultPolicy::KEYS_VALUES.ordinal)) {
        localInvalidateForRegisterInterest(keys);
      }
    }
  }
  if (needToCreateRC) {
    delete resultCollector;
  }
  return err;
}

GfErrType ThinClientRegion::unregisterKeysNoThrow(
    const std::vector<std::shared_ptr<CacheableKey>>& keys,
    bool attemptFailover) {
  RegionGlobalLocks acquireLocksRedundancy(this, false);
  RegionGlobalLocks acquireLocksFailover(this);
  CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
  GfErrType err = GF_NOERR;
  std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
  TcrMessageReply reply(true, m_tcrdm.get());
  if (keys.empty()) {
    return err;
  }

  if (m_interestList.empty() && m_durableInterestList.empty() &&
      m_interestListForUpdatesAsInvalidates.empty() &&
      m_durableInterestListForUpdatesAsInvalidates.empty()) {
    // did not register any keys before.
    return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
  }

  TcrMessageUnregisterInterestList request(
      new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true,
      InterestResultPolicy::NONE, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
  if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
    if (attemptFailover) {
      for (const auto& key : keys) {
        m_interestList.erase(key);
        m_durableInterestList.erase(key);
        m_interestListForUpdatesAsInvalidates.erase(key);
        m_durableInterestListForUpdatesAsInvalidates.erase(key);
      }
    }
  }
  return err;
}

GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy(
    const std::vector<std::shared_ptr<CacheableKey>>& keys,
    bool attemptFailover) {
  RegionGlobalLocks acquireLocksRedundancy(this, false);
  RegionGlobalLocks acquireLocksFailover(this);
  GfErrType err = GF_NOERR;
  std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
  TcrMessageReply reply(true, m_tcrdm.get());
  if (keys.empty()) {
    return err;
  }

  if (m_interestList.empty() && m_durableInterestList.empty() &&
      m_interestListForUpdatesAsInvalidates.empty() &&
      m_durableInterestListForUpdatesAsInvalidates.empty()) {
    // did not register any keys before.
    return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
  }

  TcrMessageUnregisterInterestList request(
      new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true,
      InterestResultPolicy::NONE, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
  if (err == GF_NOERR) {
    if (attemptFailover) {
      for (const auto& key : keys) {
        m_interestList.erase(key);
        m_durableInterestList.erase(key);
        m_interestListForUpdatesAsInvalidates.erase(key);
        m_durableInterestListForUpdatesAsInvalidates.erase(key);
      }
    }
  }
  return err;
}

bool ThinClientRegion::isRegexRegistered(
    std::unordered_map<std::string, InterestResultPolicy>& interestListRegex,
    const std::string& regex, bool allKeys) {
  if (interestListRegex.find(".*") != interestListRegex.end() ||
      (!allKeys && interestListRegex.find(regex) != interestListRegex.end())) {
    return true;
  }
  return false;
}

GfErrType ThinClientRegion::registerRegexNoThrow(
    const std::string& regex, bool attemptFailover, TcrEndpoint* endpoint,
    bool isDurable,
    std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> resultKeys,
    InterestResultPolicy interestPolicy, bool receiveValues,
    TcrMessageReply* reply) {
  RegionGlobalLocks acquireLocksRedundancy(this, false);
  RegionGlobalLocks acquireLocksFailover(this);
  CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
  GfErrType err = GF_NOERR;

  bool allKeys = (regex == ".*");
  std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);

  if (attemptFailover) {
    if ((isDurable &&
         (isRegexRegistered(m_durableInterestListRegex, regex, allKeys) ||
          isRegexRegistered(m_durableInterestListRegexForUpdatesAsInvalidates,
                            regex, allKeys))) ||
        (!isDurable &&
         (isRegexRegistered(m_interestListRegex, regex, allKeys) ||
          isRegexRegistered(m_interestListRegexForUpdatesAsInvalidates, regex,
                            allKeys)))) {
      return err;
    }
  }

  ChunkedInterestResponse* resultCollector = nullptr;
  ChunkedGetAllResponse* getAllResultCollector = nullptr;
  if (reply != nullptr) {
    // need to check
    resultCollector = dynamic_cast<ChunkedInterestResponse*>(
        reply->getChunkedResultHandler());
    if (resultCollector != nullptr) {
      resultKeys = resultCollector->getResultKeys();
    } else {
      getAllResultCollector = dynamic_cast<ChunkedGetAllResponse*>(
          reply->getChunkedResultHandler());
      resultKeys = getAllResultCollector->getResultKeys();
    }
  }

  bool isRCCreatedLocally = false;
  LOGDEBUG("ThinClientRegion::registerRegexNoThrow : interestpolicy is %d",
           interestPolicy.ordinal);

  // TODO:
  TcrMessageRegisterInterest request(
      new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath,
      regex.c_str(), interestPolicy, isDurable,
      getAttributes().getCachingEnabled(), receiveValues, m_tcrdm.get());
  std::recursive_mutex responseLock;
  if (reply == nullptr) {
    TcrMessageReply replyLocal(true, m_tcrdm.get());
    auto values = std::make_shared<HashMapOfCacheable>();
    auto exceptions = std::make_shared<HashMapOfException>();

    reply = &replyLocal;
    if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) {
      MapOfUpdateCounters trackers;
      int32_t destroyTracker = 1;
      if (resultKeys == nullptr) {
        resultKeys =
            std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>(
                new std::vector<std::shared_ptr<CacheableKey>>());
      }
      // need to check
      getAllResultCollector = (new ChunkedGetAllResponse(
          request, this, nullptr, values, exceptions, resultKeys, trackers,
          destroyTracker, true, responseLock));
      reply->setChunkedResultHandler(getAllResultCollector);
      isRCCreatedLocally = true;
    } else {
      isRCCreatedLocally = true;
      // need to check
      resultCollector =
          new ChunkedInterestResponse(request, resultKeys, replyLocal);
      reply->setChunkedResultHandler(resultCollector);
    }
    err = m_tcrdm->sendSyncRequestRegisterInterest(
        request, replyLocal, attemptFailover, this, endpoint);
  } else {
    err = m_tcrdm->sendSyncRequestRegisterInterest(
        request, *reply, attemptFailover, this, endpoint);
  }
  if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
    if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY &&
        endpoint) {
      LOGFINER(
          "registerRegexNoThrow - got response from secondary for "
          "endpoint %s, ignoring.",
          endpoint->name().c_str());
    } else if (attemptFailover) {
      addRegex(regex, isDurable, receiveValues, interestPolicy);
      if (interestPolicy.ordinal != InterestResultPolicy::KEYS_VALUES.ordinal) {
        if (allKeys) {
          localInvalidateRegion_internal();
        } else {
          const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>&
              keys = resultCollector != nullptr
                         ? resultCollector->getResultKeys()
                         : getAllResultCollector->getResultKeys();
          if (keys != nullptr) {
            localInvalidateForRegisterInterest(*keys);
          }
        }
      }
    }
  }

  if (isRCCreatedLocally == true) {
    if (resultCollector != nullptr) delete resultCollector;
    if (getAllResultCollector != nullptr) delete getAllResultCollector;
  }
  return err;
}

GfErrType ThinClientRegion::unregisterRegexNoThrow(const std::string& regex,
                                                   bool attemptFailover) {
  RegionGlobalLocks acquireLocksRedundancy(this, false);
  RegionGlobalLocks acquireLocksFailover(this);
  CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
  GfErrType err = GF_NOERR;

  err = findRegex(regex);

  if (err == GF_NOERR) {
    TcrMessageReply reply(false, m_tcrdm.get());
    TcrMessageUnregisterInterest request(
        new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex,
        InterestResultPolicy::NONE, false, true, m_tcrdm.get());
    err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
    if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
      if (attemptFailover) {
        clearRegex(regex);
      }
    }
  }
  return err;
}

GfErrType ThinClientRegion::findRegex(const std::string& regex) {
  GfErrType err = GF_NOERR;
  std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);

  if (m_interestListRegex.find(regex) == m_interestListRegex.end() &&
      m_durableInterestListRegex.find(regex) ==
          m_durableInterestListRegex.end() &&
      m_interestListRegexForUpdatesAsInvalidates.find(regex) ==
          m_interestListRegexForUpdatesAsInvalidates.end() &&
      m_durableInterestListRegexForUpdatesAsInvalidates.find(regex) ==
          m_durableInterestListRegexForUpdatesAsInvalidates.end()) {
    return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
  } else {
    return err;
  }
}

void ThinClientRegion::clearRegex(const std::string& regex) {
  std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
  m_interestListRegex.erase(regex);
  m_durableInterestListRegex.erase(regex);
  m_interestListRegexForUpdatesAsInvalidates.erase(regex);
  m_durableInterestListRegexForUpdatesAsInvalidates.erase(regex);
}

GfErrType ThinClientRegion::unregisterRegexNoThrowLocalDestroy(
    const std::string& regex, bool attemptFailover) {
  GfErrType err = GF_NOERR;

  err = findRegex(regex);

  if (err == GF_NOERR) {
    TcrMessageReply reply(false, m_tcrdm.get());
    TcrMessageUnregisterInterest request(
        new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex,
        InterestResultPolicy::NONE, false, true, m_tcrdm.get());
    err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
    if (err == GF_NOERR) {
      if (attemptFailover) {
        clearRegex(regex);
      }
    }
  }
  return err;
}

void ThinClientRegion::addKeys(
    const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable,
    bool receiveValues, InterestResultPolicy interestpolicy) {
  std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>&
      interestList =
          isDurable
              ? (receiveValues ? m_durableInterestList
                               : m_durableInterestListForUpdatesAsInvalidates)
              : (receiveValues ? m_interestList
                               : m_interestListForUpdatesAsInvalidates);

  for (const auto& key : keys) {
    interestList.insert(
        std::pair<std::shared_ptr<CacheableKey>, InterestResultPolicy>(
            key, interestpolicy));
  }
}

void ThinClientRegion::addRegex(const std::string& regex, bool isDurable,
                                bool receiveValues,
                                InterestResultPolicy interestpolicy) {
  std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>&
      interestList =
          isDurable
              ? (receiveValues ? m_durableInterestList
                               : m_durableInterestListForUpdatesAsInvalidates)
              : (receiveValues ? m_interestList
                               : m_interestListForUpdatesAsInvalidates);

  std::unordered_map<std::string, InterestResultPolicy>& interestListRegex =
      isDurable
          ? (receiveValues ? m_durableInterestListRegex
                           : m_durableInterestListRegexForUpdatesAsInvalidates)
          : (receiveValues ? m_interestListRegex
                           : m_interestListRegexForUpdatesAsInvalidates);

  if (regex == ".*") {
    interestListRegex.clear();
    interestList.clear();
  }

  interestListRegex.insert(
      std::pair<std::string, InterestResultPolicy>(regex, interestpolicy));
}

std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::getInterestList()
    const {
  auto nthis = const_cast<ThinClientRegion*>(this);
  RegionGlobalLocks acquireLocksRedundancy(nthis, false);
  RegionGlobalLocks acquireLocksFailover(nthis);
  CHECK_DESTROY_PENDING(TryReadGuard, getInterestList);
  std::lock_guard<decltype(m_keysLock)> keysGuard(nthis->m_keysLock);

  std::vector<std::shared_ptr<CacheableKey>> vlist;

  std::transform(std::begin(m_durableInterestList),
                 std::end(m_durableInterestList), std::back_inserter(vlist),
                 [](const decltype(m_durableInterestList)::value_type& e) {
                   return e.first;
                 });

  std::transform(
      std::begin(m_interestList), std::end(m_interestList),
      std::back_inserter(vlist),
      [](const decltype(m_interestList)::value_type& e) { return e.first; });

  return vlist;
}
std::vector<std::shared_ptr<CacheableString>>
ThinClientRegion::getInterestListRegex() const {
  auto nthis = const_cast<ThinClientRegion*>(this);
  RegionGlobalLocks acquireLocksRedundancy(nthis, false);
  RegionGlobalLocks acquireLocksFailover(nthis);
  CHECK_DESTROY_PENDING(TryReadGuard, getInterestListRegex);
  std::lock_guard<decltype(m_keysLock)> keysGuard(nthis->m_keysLock);

  std::vector<std::shared_ptr<CacheableString>> vlist;

  std::transform(std::begin(m_durableInterestListRegex),
                 std::end(m_durableInterestListRegex),
                 std::back_inserter(vlist),
                 [](const decltype(m_durableInterestListRegex)::value_type& e) {
                   return CacheableString::create(e.first.c_str());
                 });

  std::transform(std::begin(m_interestListRegex), std::end(m_interestListRegex),
                 std::back_inserter(vlist),
                 [](const decltype(m_interestListRegex)::value_type& e) {
                   return CacheableString::create(e.first.c_str());
                 });

  return vlist;
}

GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
  GfErrType err = GF_NOERR;
  std::shared_ptr<Cacheable> oldValue;
  switch (msg.getMessageType()) {
    case TcrMessage::LOCAL_INVALIDATE: {
      LocalRegion::invalidateNoThrow(
          msg.getKey(), msg.getCallbackArgument(), -1,
          CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL,
          msg.getVersionTag());
      break;
    }
    case TcrMessage::LOCAL_DESTROY: {
      err = LocalRegion::destroyNoThrow(
          msg.getKey(), msg.getCallbackArgument(), -1,
          CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL,
          msg.getVersionTag());
      break;
    }
    case TcrMessage::CLEAR_REGION: {
      LOGDEBUG("remote clear region event for reigon[%s]",
               msg.getRegionName().c_str());
      err = localClearNoThrow(
          nullptr, CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL);
      break;
    }
    case TcrMessage::LOCAL_DESTROY_REGION: {
      m_notifyRelease = true;
      err = LocalRegion::destroyRegionNoThrow(
          msg.getCallbackArgument(), true,
          CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL);
      break;
    }
    case TcrMessage::LOCAL_CREATE:
      err = LocalRegion::putNoThrow(
          msg.getKey(), msg.getValue(), msg.getCallbackArgument(), oldValue, -1,
          CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL,
          msg.getVersionTag());
      break;
    case TcrMessage::LOCAL_UPDATE: {
      //  for update set the NOTIFICATION_UPDATE to trigger the
      // afterUpdate event even if the key is not present in local cache
      err = LocalRegion::putNoThrow(
          msg.getKey(), msg.getValue(), msg.getCallbackArgument(), oldValue, -1,
          CacheEventFlags::NOTIFICATION | CacheEventFlags::NOTIFICATION_UPDATE |
              CacheEventFlags::LOCAL,
          msg.getVersionTag(), msg.getDelta(), msg.getEventId());
      break;
    }
    case TcrMessage::TOMBSTONE_OPERATION:
      LocalRegion::tombstoneOperationNoThrow(msg.getTombstoneVersions(),
                                             msg.getTombstoneKeys());
      break;
    default: {
      if (TcrMessage::getAllEPDisMess() == &msg) {
        setProcessedMarker(false);
        LocalRegion::invokeAfterAllEndPointDisconnected();
      } else {
        LOGERROR(
            "Unknown message type %d in subscription event handler; possible "
            "serialization mismatch",
            msg.getMessageType());
        err = GF_MSG;
      }
      break;
    }
  }

  // Update EventIdMap to mark event processed, Only for durable client.
  // In case of closing, don't send it as listener might not be invoked.
  if (!m_destroyPending && (m_isDurableClnt || msg.hasDelta()) &&
      TcrMessage::getAllEPDisMess() != &msg) {
    m_tcrdm->checkDupAndAdd(msg.getEventId());
  }

  return err;
}

GfErrType ThinClientRegion::handleServerException(
    const std::string& func, const std::string& exceptionMsg) {
  GfErrType error = GF_NOERR;
  setThreadLocalExceptionMessage(exceptionMsg);
  if (exceptionMsg.find("org.apache.geode.security.NotAuthorizedException") !=
      std::string::npos) {
    error = GF_NOT_AUTHORIZED_EXCEPTION;
  } else if (exceptionMsg.find("org.apache.geode.cache.CacheWriterException") !=
             std::string::npos) {
    error = GF_CACHE_WRITER_EXCEPTION;
  } else if (exceptionMsg.find(
                 "org.apache.geode.security.AuthenticationFailedException") !=
             std::string::npos) {
    error = GF_AUTHENTICATION_FAILED_EXCEPTION;
  } else if (exceptionMsg.find("org.apache.geode.internal.cache.execute."
                               "InternalFunctionInvocationTargetException") !=
             std::string::npos) {
    error = GF_FUNCTION_EXCEPTION;
  } else if (exceptionMsg.find(
                 "org.apache.geode.cache.CommitConflictException") !=
             std::string::npos) {
    error = GF_COMMIT_CONFLICT_EXCEPTION;
  } else if (exceptionMsg.find("org.apache.geode.cache."
                               "TransactionDataNodeHasDepartedException") !=
             std::string::npos) {
    error = GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION;
  } else if (exceptionMsg.find(
                 "org.apache.geode.cache.TransactionDataRebalancedException") !=
             std::string::npos) {
    error = GF_TRANSACTION_DATA_REBALANCED_EXCEPTION;
  } else if (exceptionMsg.find(
                 "org.apache.geode.security.AuthenticationRequiredException") !=
             std::string::npos) {
    error = GF_AUTHENTICATION_REQUIRED_EXCEPTION;
  } else {
    error = GF_CACHESERVER_EXCEPTION;
  }

  if (error != GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
    LOGERROR(func + ": An exception (" + exceptionMsg +
             ") happened at remote server.");
  } else {
    LOGFINER(func + ": An exception (" + exceptionMsg +
             ") happened at remote server.");
  }
  return error;
}

void ThinClientRegion::receiveNotification(TcrMessage* msg) {
  std::unique_lock<std::mutex> lock(m_notificationMutex, std::defer_lock);
  {
    TryReadGuard guard(m_rwLock, m_destroyPending);
    if (m_destroyPending) {
      if (msg != TcrMessage::getAllEPDisMess()) {
        _GEODE_SAFE_DELETE(msg);
      }
      return;
    }
    lock.lock();
  }

  if (msg->getMessageType() == TcrMessage::CLIENT_MARKER) {
    handleMarker();
  } else {
    clientNotificationHandler(*msg);
  }

  lock.unlock();
  if (TcrMessage::getAllEPDisMess() != msg) _GEODE_SAFE_DELETE(msg);
}

void ThinClientRegion::localInvalidateRegion_internal() {
  std::shared_ptr<MapEntryImpl> me;
  std::shared_ptr<Cacheable> oldValue;

  std::vector<std::shared_ptr<CacheableKey>> keysVec = keys_internal();
  for (const auto& key : keysVec) {
    std::shared_ptr<VersionTag> versionTag;
    m_entries->invalidate(key, me, oldValue, versionTag);
  }
}

void ThinClientRegion::invalidateInterestList(
    std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>&
        interestList) {
  std::shared_ptr<MapEntryImpl> me;
  std::shared_ptr<Cacheable> oldValue;

  if (!m_regionAttributes.getCachingEnabled()) {
    return;
  }
  for (const auto& iter : interestList) {
    std::shared_ptr<VersionTag> versionTag;
    m_entries->invalidate(iter.first, me, oldValue, versionTag);
  }
}

void ThinClientRegion::localInvalidateFailover() {
  CHECK_DESTROY_PENDING(TryReadGuard,
                        ThinClientRegion::localInvalidateFailover);

  //  No need to invalidate from the "m_xxxForUpdatesAsInvalidates" lists?
  if (m_interestListRegex.empty() && m_durableInterestListRegex.empty()) {
    invalidateInterestList(m_interestList);
    invalidateInterestList(m_durableInterestList);
  } else {
    localInvalidateRegion_internal();
  }
}

void ThinClientRegion::localInvalidateForRegisterInterest(
    const std::vector<std::shared_ptr<CacheableKey>>& keys) {
  CHECK_DESTROY_PENDING(TryReadGuard,
                        ThinClientRegion::localInvalidateForRegisterInterest);

  if (!m_regionAttributes.getCachingEnabled()) {
    return;
  }

  std::shared_ptr<Cacheable> oldValue;
  std::shared_ptr<MapEntryImpl> me;

  for (const auto& key : keys) {
    std::shared_ptr<VersionTag> versionTag;
    m_entries->invalidate(key, me, oldValue, versionTag);
    updateAccessAndModifiedTimeForEntry(me, true);
  }
}

InterestResultPolicy ThinClientRegion::copyInterestList(
    std::vector<std::shared_ptr<CacheableKey>>& keysVector,
    std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>&
        interestList) const {
  InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
  for (std::unordered_map<std::shared_ptr<CacheableKey>,
                          InterestResultPolicy>::const_iterator iter =
           interestList.begin();
       iter != interestList.end(); ++iter) {
    keysVector.push_back(iter->first);
    interestPolicy = iter->second;
  }
  return interestPolicy;
}

void ThinClientRegion::registerInterestGetValues(
    const char* method, const std::vector<std::shared_ptr<CacheableKey>>* keys,
    const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>&
        resultKeys) {
  auto exceptions = std::make_shared<HashMapOfException>();
  auto err = getAllNoThrow_remote(keys, nullptr, exceptions, resultKeys, true,
                                  nullptr);
  throwExceptionIfError(method, err);
  // log any exceptions here
  for (const auto& iter : *exceptions) {
    LOGWARN("%s Exception for key %s:: %s: %s", method,
            Utils::nullSafeToString(iter.first).c_str(),
            iter.second->getName().c_str(), iter.second->what());
  }
}

void ThinClientRegion::destroyDM(bool keepEndpoints) {
  if (m_tcrdm != nullptr) {
    m_tcrdm->destroy(keepEndpoints);
  }
}

void ThinClientRegion::release(bool invokeCallbacks) {
  if (m_released) {
    return;
  }

  std::unique_lock<std::mutex> lock(m_notificationMutex, std::defer_lock);
  if (!m_notifyRelease) {
    lock.lock();
  }

  destroyDM(invokeCallbacks);

  m_interestList.clear();
  m_interestListRegex.clear();
  m_durableInterestList.clear();
  m_durableInterestListRegex.clear();

  m_interestListForUpdatesAsInvalidates.clear();
  m_interestListRegexForUpdatesAsInvalidates.clear();
  m_durableInterestListForUpdatesAsInvalidates.clear();
  m_durableInterestListRegexForUpdatesAsInvalidates.clear();

  LocalRegion::release(invokeCallbacks);
}

ThinClientRegion::~ThinClientRegion() noexcept {
  TryWriteGuard guard(m_rwLock, m_destroyPending);
  if (!m_destroyPending) {
    release(false);
  }
}

void ThinClientRegion::acquireGlobals(bool isFailover) {
  if (isFailover) {
    m_tcrdm->acquireFailoverLock();
  }
}

void ThinClientRegion::releaseGlobals(bool isFailover) {
  if (isFailover) {
    m_tcrdm->releaseFailoverLock();
  }
}

void ThinClientRegion::executeFunction(
    const std::string& func, const std::shared_ptr<Cacheable>& args,
    std::shared_ptr<CacheableVector> routingObj, uint8_t getResult,
    std::shared_ptr<ResultCollector> rc, int32_t retryAttempts,
    std::chrono::milliseconds timeout) {
  int32_t attempt = 0;
  auto failedNodes = CacheableHashSet::create();
  // if pools retry attempts are not set then retry once on all available
  // endpoints
  if (retryAttempts == -1) {
    retryAttempts = static_cast<int32_t>(m_tcrdm->getNumberOfEndPoints());
  }

  bool reExecute = false;
  bool reExecuteForServ = false;

  do {
    TcrMessage* msg;
    if (reExecuteForServ) {
      msg = new TcrMessageExecuteRegionFunction(
          new DataOutput(m_cacheImpl->createDataOutput()), func, this, args,
          routingObj, getResult, failedNodes, timeout, m_tcrdm.get(),
          static_cast<int8_t>(1));
    } else {
      msg = new TcrMessageExecuteRegionFunction(
          new DataOutput(m_cacheImpl->createDataOutput()), func, this, args,
          routingObj, getResult, failedNodes, timeout, m_tcrdm.get(),
          static_cast<int8_t>(0));
    }
    TcrMessageReply reply(true, m_tcrdm.get());
    // need to check
    ChunkedFunctionExecutionResponse* resultCollector(
        new ChunkedFunctionExecutionResponse(reply, (getResult & 2) == 2, rc));
    reply.setChunkedResultHandler(resultCollector);
    reply.setTimeout(timeout);
    GfErrType err = GF_NOERR;
    err = m_tcrdm->sendSyncRequest(*msg, reply, !(getResult & 1));
    resultCollector->reset();
    delete msg;
    delete resultCollector;
    if (err == GF_NOERR &&
        (reply.getMessageType() == TcrMessage::EXCEPTION ||
         reply.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
      err = ThinClientRegion::handleServerException("Execute",
                                                    reply.getException());
    }

    if (ThinClientBaseDM::isFatalClientError(err)) {
      throwExceptionIfError("ExecuteOnRegion:", err);
    } else if (err != GF_NOERR) {
      if (err == GF_FUNCTION_EXCEPTION) {
        reExecute = true;
        rc->clearResults();
        std::shared_ptr<CacheableHashSet> failedNodesIds(reply.getFailedNode());
        failedNodes->clear();
        if (failedNodesIds) {
          LOGDEBUG(
              "ThinClientRegion::executeFunction with GF_FUNCTION_EXCEPTION "
              "failedNodesIds size = %zu ",
              failedNodesIds->size());
          failedNodes->insert(failedNodesIds->begin(), failedNodesIds->end());
        }
      } else if (err == GF_NOTCON) {
        attempt++;
        LOGDEBUG(
            "ThinClientRegion::executeFunction with GF_NOTCON retry attempt = "
            "%d ",
            attempt);
        if (attempt > retryAttempts) {
          throwExceptionIfError("ExecuteOnRegion:", err);
        }
        reExecuteForServ = true;
        rc->clearResults();
        failedNodes->clear();
      } else if (err == GF_TIMEOUT) {
        LOGINFO("function timeout. Name: %s, timeout: %s, params: %" PRIu8
                ", "
                "retryAttempts: %d ",
                func.c_str(), to_string(timeout).c_str(), getResult,
                retryAttempts);
        throwExceptionIfError("ExecuteOnRegion", GF_TIMEOUT);
      } else if (err == GF_CLIENT_WAIT_TIMEOUT ||
                 err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
        LOGINFO(
            "function timeout, possibly bucket is not available. Name: %s, "
            "timeout: %s, params: %" PRIu8 ", retryAttempts: %d ",
            func.c_str(), to_string(timeout).c_str(), getResult, retryAttempts);
        throwExceptionIfError("ExecuteOnRegion", GF_CLIENT_WAIT_TIMEOUT);
      } else {
        LOGDEBUG("executeFunction err = %d ", err);
        throwExceptionIfError("ExecuteOnRegion:", err);
      }
    } else {
      reExecute = false;
      reExecuteForServ = false;
    }
  } while (reExecuteForServ);

  if (reExecute && (getResult & 1)) {
    reExecuteFunction(func, args, routingObj, getResult, rc, retryAttempts,
                      failedNodes, timeout);
  }
}
std::shared_ptr<CacheableVector> ThinClientRegion::reExecuteFunction(
    const std::string& func, const std::shared_ptr<Cacheable>& args,
    std::shared_ptr<CacheableVector> routingObj, uint8_t getResult,
    std::shared_ptr<ResultCollector> rc, int32_t retryAttempts,
    std::shared_ptr<CacheableHashSet>& failedNodes,
    std::chrono::milliseconds timeout) {
  int32_t attempt = 0;
  bool reExecute = true;
  // if pools retry attempts are not set then retry once on all available
  // endpoints
  if (retryAttempts == -1) {
    retryAttempts = static_cast<int32_t>(m_tcrdm->getNumberOfEndPoints());
  }

  do {
    reExecute = false;
    TcrMessageExecuteRegionFunction msg(
        new DataOutput(m_cacheImpl->createDataOutput()), func, this, args,
        routingObj, getResult, failedNodes, timeout, m_tcrdm.get(),
        static_cast<int8_t>(1));
    TcrMessageReply reply(true, m_tcrdm.get());
    // need to check
    ChunkedFunctionExecutionResponse* resultCollector(
        new ChunkedFunctionExecutionResponse(reply, (getResult & 2) == 2, rc));
    reply.setChunkedResultHandler(resultCollector);
    reply.setTimeout(timeout);

    GfErrType err = GF_NOERR;
    err = m_tcrdm->sendSyncRequest(msg, reply, !(getResult & 1));
    delete resultCollector;
    if (err == GF_NOERR &&
        (reply.getMessageType() == TcrMessage::EXCEPTION ||
         reply.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
      err = ThinClientRegion::handleServerException("Execute",
                                                    reply.getException());
    }

    if (ThinClientBaseDM::isFatalClientError(err)) {
      throwExceptionIfError("ExecuteOnRegion:", err);
    } else if (err != GF_NOERR) {
      if (err == GF_FUNCTION_EXCEPTION) {
        reExecute = true;
        rc->clearResults();
        std::shared_ptr<CacheableHashSet> failedNodesIds(reply.getFailedNode());
        failedNodes->clear();
        if (failedNodesIds) {
          LOGDEBUG(
              "ThinClientRegion::reExecuteFunction with GF_FUNCTION_EXCEPTION "
              "failedNodesIds size = %zu ",
              failedNodesIds->size());
          failedNodes->insert(failedNodesIds->begin(), failedNodesIds->end());
        }
      } else if ((err == GF_NOTCON) || (err == GF_CLIENT_WAIT_TIMEOUT) ||
                 (err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA)) {
        attempt++;
        LOGDEBUG(
            "ThinClientRegion::reExecuteFunction with GF_NOTCON OR TIMEOUT "
            "retry attempt "
            "= %d ",
            attempt);
        if (attempt > retryAttempts) {
          throwExceptionIfError("ExecuteOnRegion:", err);
        }
        reExecute = true;
        rc->clearResults();
        failedNodes->clear();
      } else if (err == GF_TIMEOUT) {
        LOGINFO("function timeout");
        throwExceptionIfError("ExecuteOnRegion", GF_CACHE_TIMEOUT_EXCEPTION);
      } else {
        LOGDEBUG("reExecuteFunction err = %d ", err);
        throwExceptionIfError("ExecuteOnRegion:", err);
      }
    }
  } while (reExecute);
  return nullptr;
}

bool ThinClientRegion::executeFunctionSH(
    const std::string& func, const std::shared_ptr<Cacheable>& args,
    uint8_t getResult, std::shared_ptr<ResultCollector> rc,
    const std::shared_ptr<ClientMetadataService::ServerToKeysMap>& locationMap,
    std::shared_ptr<CacheableHashSet>& failedNodes,
    std::chrono::milliseconds timeout, bool allBuckets) {
  bool reExecute = false;
  auto resultCollectorLock = std::make_shared<std::recursive_mutex>();
  const auto& userAttr = UserAttributes::threadLocalUserAttributes;
  std::vector<std::shared_ptr<OnRegionFunctionExecution>> feWorkers;
  auto& threadPool =
      CacheRegionHelper::getCacheImpl(&getCache())->getThreadPool();

  for (const auto& locationIter : *locationMap) {
    const auto& serverLocation = locationIter.first;
    const auto& routingObj = locationIter.second;
    auto worker = std::make_shared<OnRegionFunctionExecution>(
        func, this, args, routingObj, getResult, timeout,
        dynamic_cast<ThinClientPoolDM*>(m_tcrdm.get()), resultCollectorLock, rc,
        userAttr, false, serverLocation, allBuckets);
    threadPool.perform(worker);
    feWorkers.push_back(worker);
  }

  GfErrType abortError = GF_NOERR;

  for (auto worker : feWorkers) {
    auto err = worker->getResult();
    auto currentReply = worker->getReply();

    if (err == GF_NOERR &&
        (currentReply->getMessageType() == TcrMessage::EXCEPTION ||
         currentReply->getMessageType() ==
             TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
      err = ThinClientRegion::handleServerException(
          "Execute", currentReply->getException());
    }

    if (err != GF_NOERR) {
      if (err == GF_FUNCTION_EXCEPTION) {
        reExecute = true;
        if (auto poolDM =
                std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
          if (poolDM->getClientMetaDataService()) {
            poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
                this->getFullPath(), 0);
          }
        }
        worker->getResultCollector()->reset();
        {
          std::lock_guard<decltype(*resultCollectorLock)> guard(
              *resultCollectorLock);
          rc->clearResults();
        }
        std::shared_ptr<CacheableHashSet> failedNodeIds(
            currentReply->getFailedNode());
        if (failedNodeIds) {
          LOGDEBUG(
              "ThinClientRegion::executeFunctionSH with GF_FUNCTION_EXCEPTION "
              "failedNodeIds size = %zu ",
              failedNodeIds->size());
          failedNodes->insert(failedNodeIds->begin(), failedNodeIds->end());
        }
      } else if ((err == GF_NOTCON) || (err == GF_CLIENT_WAIT_TIMEOUT) ||
                 (err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA)) {
        reExecute = true;
        LOGINFO(
            "ThinClientRegion::executeFunctionSH with GF_NOTCON or "
            "GF_CLIENT_WAIT_TIMEOUT ");
        if (auto poolDM =
                std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) {
          if (poolDM->getClientMetaDataService()) {
            poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
                this->getFullPath(), 0);
          }
        }
        worker->getResultCollector()->reset();
        {
          std::lock_guard<decltype(*resultCollectorLock)> guard(
              *resultCollectorLock);
          rc->clearResults();
        }
      } else {
        if (ThinClientBaseDM::isFatalClientError(err)) {
          LOGERROR("ThinClientRegion::executeFunctionSH: Fatal Exception");
        } else {
          LOGWARN("ThinClientRegion::executeFunctionSH: Unexpected Exception");
        }

        if (abortError == GF_NOERR) {
          abortError = err;
        }
      }
    }
  }

  if (abortError != GF_NOERR) {
    throwExceptionIfError("ExecuteOnRegion:", abortError);
  }
  return reExecute;
}

GfErrType ThinClientRegion::getFuncAttributes(
    const std::string& func, std::shared_ptr<std::vector<int8_t>>* attr) {
  GfErrType err = GF_NOERR;

  // do TCR GET_FUNCTION_ATTRIBUTES
  LOGDEBUG("Tcrmessage request GET_FUNCTION_ATTRIBUTES ");
  TcrMessageGetFunctionAttributes request(
      new DataOutput(m_cacheImpl->createDataOutput()), func, m_tcrdm.get());
  TcrMessageReply reply(true, m_tcrdm.get());
  err = m_tcrdm->sendSyncRequest(request, reply);
  if (err != GF_NOERR) {
    return err;
  }
  switch (reply.getMessageType()) {
    case TcrMessage::RESPONSE: {
      *attr = reply.getFunctionAttributes();
      break;
    }
    case TcrMessage::EXCEPTION: {
      err = handleServerException("Region::GET_FUNCTION_ATTRIBUTES",
                                  reply.getException());
      break;
    }
    case TcrMessage::REQUEST_DATA_ERROR: {
      LOGERROR("Error message from server: " + reply.getValue()->toString());
      throw FunctionExecutionException(reply.getValue()->toString());
    }
    default: {
      LOGERROR("Unknown message type %d while getting function attributes.",
               reply.getMessageType());
      err = GF_MSG;
      break;
    }
  }
  return err;
}

GfErrType ThinClientRegion::getNoThrow_FullObject(
    std::shared_ptr<EventId> eventId, std::shared_ptr<Cacheable>& fullObject,
    std::shared_ptr<VersionTag>& versionTag) {
  TcrMessageRequestEventValue fullObjectMsg(
      new DataOutput(m_cacheImpl->createDataOutput()), eventId);
  TcrMessageReply reply(true, nullptr);

  GfErrType err = GF_NOTCON;
  err = m_tcrdm->sendSyncRequest(fullObjectMsg, reply, false, true);
  if (err == GF_NOERR) {
    fullObject = reply.getValue();
  }
  versionTag = reply.getVersionTag();
  return err;
}

void ThinClientRegion::txDestroy(
    const std::shared_ptr<CacheableKey>& key,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag> versionTag) {
  GfErrType err = destroyNoThrowTX(key, aCallbackArgument, -1,
                                   CacheEventFlags::NORMAL, versionTag);
  throwExceptionIfError("Region::destroyTX", err);
}

void ThinClientRegion::txInvalidate(
    const std::shared_ptr<CacheableKey>& key,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag> versionTag) {
  GfErrType err = invalidateNoThrowTX(key, aCallbackArgument, -1,
                                      CacheEventFlags::NORMAL, versionTag);
  throwExceptionIfError("Region::invalidateTX", err);
}

void ThinClientRegion::txPut(
    const std::shared_ptr<CacheableKey>& key,
    const std::shared_ptr<Cacheable>& value,
    const std::shared_ptr<Serializable>& aCallbackArgument,
    std::shared_ptr<VersionTag> versionTag) {
  std::shared_ptr<Cacheable> oldValue;
  int64_t sampleStartNanos = startStatOpTime();
  GfErrType err = putNoThrowTX(key, value, aCallbackArgument, oldValue, -1,
                               CacheEventFlags::NORMAL, versionTag);

  updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(),
                   sampleStartNanos);
  throwExceptionIfError("Region::putTX", err);
}

void ThinClientRegion::setProcessedMarker(bool) {}

void ChunkedInterestResponse::reset() {
  if (m_resultKeys != nullptr && m_resultKeys->size() > 0) {
    m_resultKeys->clear();
  }
}

void ChunkedInterestResponse::handleChunk(const uint8_t* chunk,
                                          int32_t chunkLen,
                                          uint8_t isLastChunkWithSecurity,
                                          const CacheImpl* cacheImpl) {
  auto input =
      cacheImpl->createDataInput(chunk, chunkLen, m_replyMsg.getPool());

  uint32_t partLen;
  if (TcrMessageHelper::readChunkPartHeader(
          m_msg, input, DSCode::FixedIDDefault,
          static_cast<int32_t>(DSCode::CacheableArrayList),
          "ChunkedInterestResponse", partLen, isLastChunkWithSecurity) !=
      TcrMessageHelper::ChunkObjectType::OBJECT) {
    // encountered an exception part, so return without reading more
    m_replyMsg.readSecureObjectPart(input, false, true,
                                    isLastChunkWithSecurity);
    return;
  }

  if (m_resultKeys == nullptr) {
    m_resultKeys =
        std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
  }
  serializer::readObject(input, *m_resultKeys);
  m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}

void ChunkedKeySetResponse::reset() {
  if (m_resultKeys.size() > 0) {
    m_resultKeys.clear();
  }
}

void ChunkedKeySetResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
                                        uint8_t isLastChunkWithSecurity,
                                        const CacheImpl* cacheImpl) {
  auto input =
      cacheImpl->createDataInput(chunk, chunkLen, m_replyMsg.getPool());

  uint32_t partLen;
  if (TcrMessageHelper::readChunkPartHeader(
          m_msg, input, DSCode::FixedIDDefault,
          static_cast<int32_t>(DSCode::CacheableArrayList),
          "ChunkedKeySetResponse", partLen, isLastChunkWithSecurity) !=
      TcrMessageHelper::ChunkObjectType::OBJECT) {
    // encountered an exception part, so return without reading more
    m_replyMsg.readSecureObjectPart(input, false, true,
                                    isLastChunkWithSecurity);
    return;
  }

  serializer::readObject(input, m_resultKeys);
  m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}

void ChunkedQueryResponse::reset() {
  m_queryResults->clear();
  m_structFieldNames.clear();
}

void ChunkedQueryResponse::readObjectPartList(DataInput& input,
                                              bool isResultSet) {
  if (input.readBoolean()) {
    LOGERROR("Query response has keys which is unexpected.");
    throw IllegalStateException("Query response has keys which is unexpected.");
  }

  int32_t len = input.readInt32();

  for (int32_t index = 0; index < len; ++index) {
    if (input.read() == 2 /* for exception*/) {
      input.advanceCursor(input.readArrayLength());  // skipLen
      auto exMsgPtr = input.readString();
      throw IllegalStateException(exMsgPtr);
    } else {
      if (isResultSet) {
        std::shared_ptr<Cacheable> value;
        input.readObject(value);
        m_queryResults->push_back(value);
      } else {
        auto code = static_cast<DSCode>(input.read());
        if (code == DSCode::FixedIDByte) {
          auto arrayType = static_cast<DSFid>(input.read());
          if (arrayType != DSFid::CacheableObjectPartList) {
            LOGERROR(
                "Query response got unhandled message format %d while "
                "expecting struct set object part list; possible serialization "
                "mismatch",
                arrayType);
            throw MessageException(
                "Query response got unhandled message format while expecting "
                "struct set object part list; possible serialization mismatch");
          }
          readObjectPartList(input, true);
        } else {
          LOGERROR(
              "Query response got unhandled message format %" PRId8
              "while expecting "
              "struct set object part list; possible serialization mismatch",
              code);
          throw MessageException(
              "Query response got unhandled message format while expecting "
              "struct set object part list; possible serialization mismatch");
        }
      }
    }
  }
}

void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
                                       uint8_t isLastChunkWithSecurity,
                                       const CacheImpl* cacheImpl) {
  LOGDEBUG("ChunkedQueryResponse::handleChunk..");
  auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool());

  uint32_t partLen;
  auto objType = TcrMessageHelper::readChunkPartHeader(
      m_msg, input, DSCode::FixedIDByte,
      static_cast<int32_t>(DSFid::CollectionTypeImpl), "ChunkedQueryResponse",
      partLen, isLastChunkWithSecurity);
  if (objType == TcrMessageHelper::ChunkObjectType::EXCEPTION) {
    // encountered an exception part, so return without reading more
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
    return;
  } else if (objType == TcrMessageHelper::ChunkObjectType::NULL_OBJECT) {
    // special case for scalar result
    input.readInt32();  // ignored part length
    input.read();       // ignored is object
    auto intVal = std::dynamic_pointer_cast<CacheableInt32>(input.readObject());
    m_queryResults->push_back(intVal);
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
    return;
  }

  // ignoring parent classes for now
  // we will require to look at it once CQ is to be implemented.
  // skipping HashSet/StructSet
  // qhe: It was agreed upon that we'll use set for all kinds of results.
  // to avoid dealing with compare operator for user objets.
  // If the results on server are in a bag, or the user need to manipulate
  // the elements, then we have to revisit this issue.
  // For now, we'll live with duplicate records, hoping they do not cost much.
  skipClass(input);
  // skipping CollectionTypeImpl
  // skipClass(input); // no longer, since GFE 5.7

  input.read();  // this is Fixed ID byte (1)
  input.read();  // this is DataSerializable (45)
  input.read();  // this is Class 43
  const auto isStructTypeImpl = input.readString();

  if (isStructTypeImpl == "org.apache.geode.cache.query.Struct") {
    int32_t numOfFldNames = input.readArrayLength();
    bool skip = false;
    if (m_structFieldNames.size() != 0) {
      skip = true;
    }
    for (int i = 0; i < numOfFldNames; i++) {
      auto sptr = input.readString();
      if (!skip) {
        m_structFieldNames.push_back(sptr);
      }
    }
  }

  // skip the remaining part
  input.reset();
  // skip the whole part including partLen and isObj (4+1)
  input.advanceCursor(partLen + 5);

  input.readInt32();  // skip part length

  if (!input.read()) {
    LOGERROR(
        "Query response part is not an object; possible serialization "
        "mismatch");
    throw MessageException(
        "Query response part is not an object; possible serialization "
        "mismatch");
  }

  bool isResultSet = (m_structFieldNames.size() == 0);

  auto arrayType = static_cast<DSCode>(input.read());

  if (arrayType == DSCode::CacheableObjectArray) {
    int32_t arraySize = input.readArrayLength();
    skipClass(input);
    for (int32_t arrayItem = 0; arrayItem < arraySize; ++arrayItem) {
      std::shared_ptr<Serializable> value;
      if (isResultSet) {
        input.readObject(value);
        m_queryResults->push_back(value);
      } else {
        input.read();
        int32_t arraySize2 = input.readArrayLength();
        skipClass(input);
        for (int32_t index = 0; index < arraySize2; ++index) {
          input.readObject(value);
          m_queryResults->push_back(value);
        }
      }
    }
  } else if (arrayType == DSCode::FixedIDByte) {
    arrayType = static_cast<DSCode>(input.read());
    if (static_cast<int32_t>(arrayType) !=
        static_cast<int32_t>(DSFid::CacheableObjectPartList)) {
      LOGERROR(
          "Query response got unhandled message format %d while expecting "
          "object part list; possible serialization mismatch",
          arrayType);
      throw MessageException(
          "Query response got unhandled message format while expecting object "
          "part list; possible serialization mismatch");
    }
    readObjectPartList(input, isResultSet);
  } else {
    LOGERROR(
        "Query response got unhandled message format %d; possible "
        "serialization mismatch",
        arrayType);
    throw MessageException(
        "Query response got unhandled message format; possible serialization "
        "mismatch");
  }

  m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}

void ChunkedQueryResponse::skipClass(DataInput& input) {
  auto classByte = static_cast<DSCode>(input.read());
  if (classByte == DSCode::Class) {
    // ignore string type id - assuming its a normal (under 64k) string.
    input.read();
    uint16_t classLen = input.readInt16();
    input.advanceCursor(classLen);
  } else {
    throw IllegalStateException(
        "ChunkedQueryResponse::skipClass: "
        "Did not get expected class header byte");
  }
}

void ChunkedFunctionExecutionResponse::reset() {
  // m_functionExecutionResults->clear();
}

void ChunkedFunctionExecutionResponse::handleChunk(
    const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity,
    const CacheImpl* cacheImpl) {
  LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk");
  auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool());

  uint32_t partLen;

  TcrMessageHelper::ChunkObjectType arrayType;
  if ((arrayType = TcrMessageHelper::readChunkPartHeader(
           m_msg, input, "ChunkedFunctionExecutionResponse", partLen,
           isLastChunkWithSecurity)) ==
      TcrMessageHelper::ChunkObjectType::EXCEPTION) {
    // encountered an exception part, so return without reading more
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
    return;
  }

  if (m_getResult == false) {
    return;
  }

  if (static_cast<TcrMessageHelper::ChunkObjectType>(arrayType) ==
      TcrMessageHelper::ChunkObjectType::NULL_OBJECT) {
    LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk nullptr object");
    //	m_functionExecutionResults->push_back(nullptr);
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
    return;
  }

  auto startLen = static_cast<size_t>(
      input.getBytesRead() -
      1);  // from here need to look value part + memberid AND -1 for array type
  // read and ignore array length
  input.readArrayLength();

  // read a byte to determine whether to read exception part for sendException
  // or read objects.
  auto partType = static_cast<DSCode>(input.read());
  bool isExceptionPart = false;
  // See If partType is JavaSerializable
  const int CHUNK_HDR_LEN = 5;
  const int SECURE_PART_LEN = 5 + 8;
  bool readPart = true;
  LOGDEBUG(
      "ChunkedFunctionExecutionResponse::handleChunk chunkLen = %d & partLen = "
      "%d ",
      chunkLen, partLen);
  if (partType == DSCode::JavaSerializable) {
    isExceptionPart = true;
    // reset the input.
    input.reset();

    if (((isLastChunkWithSecurity & 0x02) &&
         (chunkLen - static_cast<int32_t>(partLen) <=
          CHUNK_HDR_LEN + SECURE_PART_LEN)) ||
        (((isLastChunkWithSecurity & 0x02) == 0) &&
         (chunkLen - static_cast<int32_t>(partLen) <= CHUNK_HDR_LEN))) {
      readPart = false;
      partLen = input.readInt32();
      input.advanceCursor(1);  // skip isObject byte
      input.advanceCursor(partLen);
    } else {
      // skip first part i.e JavaSerializable.
      TcrMessageHelper::skipParts(m_msg, input, 1);

      // read the second part which is string in usual manner, first its length.
      partLen = input.readInt32();

      // then isObject byte
      input.read();  // ignore iSobject

      startLen = input.getBytesRead();  // reset from here need to look value
      // part + memberid AND -1 for array type

      // Since it is contained as a part of other results, read arrayType which
      // is arrayList = 65.
      input.read();

      // read and ignore its len which is 2
      input.readArrayLength();
    }
  } else {
    // rewind cursor by 1 to what we had read a byte to determine whether to
    // read exception part or read objects.
    input.rewindCursor(1);
  }

  // Read either object or exception string from sendException.
  std::shared_ptr<Serializable> value;
  // std::shared_ptr<Cacheable> memberId;
  if (readPart) {
    input.readObject(value);
    // TODO: track this memberId for PrFxHa
    // input.readObject(memberId);
    auto objectlen = input.getBytesRead() - startLen;

    auto memberIdLen = partLen - objectlen;
    input.advanceCursor(memberIdLen);
    LOGDEBUG("function partlen = %d , objectlen = %z,  memberidlen = %z ",
             partLen, objectlen, memberIdLen);
    LOGDEBUG("function input.getBytesRemaining() = %z ",
             input.getBytesRemaining());

  } else {
    value = CacheableString::create("Function exception result.");
  }
  if (m_rc != nullptr) {
    std::shared_ptr<Cacheable> result = nullptr;
    if (isExceptionPart) {
      result = std::make_shared<UserFunctionExecutionException>(
          std::dynamic_pointer_cast<CacheableString>(value)->value());
    } else {
      result = value;
    }
    if (m_resultCollectorLock) {
      std::lock_guard<decltype(*m_resultCollectorLock)> guard(
          *m_resultCollectorLock);
      m_rc->addResult(result);
    } else {
      m_rc->addResult(result);
    }
  }

  m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
  //  m_functionExecutionResults->push_back(value);
}

void ChunkedGetAllResponse::reset() {
  m_keysOffset = 0;
  if (m_resultKeys != nullptr && m_resultKeys->size() > 0) {
    m_resultKeys->clear();
  }
}

// process a GET_ALL response chunk
void ChunkedGetAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
                                        uint8_t isLastChunkWithSecurity,
                                        const CacheImpl* cacheImpl) {
  auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool());

  uint32_t partLen;
  if (TcrMessageHelper::readChunkPartHeader(
          m_msg, input, DSCode::FixedIDByte,
          static_cast<int32_t>(DSFid::VersionedObjectPartList),
          "ChunkedGetAllResponse", partLen, isLastChunkWithSecurity) !=
      TcrMessageHelper::ChunkObjectType::OBJECT) {
    // encountered an exception part, so return without reading more
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
    return;
  }

  VersionedCacheableObjectPartList objectList(
      m_keys, &m_keysOffset, m_values, m_exceptions, m_resultKeys, m_region,
      &m_trackerMap, m_destroyTracker, m_addToLocalCache, m_dsmemId,
      m_responseLock);

  objectList.fromData(input);

  m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}

void ChunkedGetAllResponse::add(const ChunkedGetAllResponse* other) {
  if (m_values) {
    for (const auto& iter : *m_values) {
      m_values->emplace(iter.first, iter.second);
    }
  }

  if (m_exceptions) {
    m_exceptions->insert(other->m_exceptions->begin(),
                         other->m_exceptions->end());
  }

  for (const auto& iter : other->m_trackerMap) {
    m_trackerMap[iter.first] = iter.second;
  }

  if (m_resultKeys) {
    m_resultKeys->insert(m_resultKeys->end(), other->m_resultKeys->begin(),
                         other->m_resultKeys->end());
  }
}

void ChunkedPutAllResponse::reset() {
  if (m_list != nullptr && m_list->size() > 0) {
    m_list->getVersionedTagptr().clear();
  }
}

// process a PUT_ALL response chunk
void ChunkedPutAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
                                        uint8_t isLastChunkWithSecurity,
                                        const CacheImpl* cacheImpl) {
  auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool());

  uint32_t partLen;
  TcrMessageHelper::ChunkObjectType chunkType;
  if ((chunkType = TcrMessageHelper::readChunkPartHeader(
           m_msg, input, DSCode::FixedIDByte,
           static_cast<int32_t>(DSFid::VersionedObjectPartList),
           "ChunkedPutAllResponse", partLen, isLastChunkWithSecurity)) ==
      TcrMessageHelper::ChunkObjectType::NULL_OBJECT) {
    LOGDEBUG("ChunkedPutAllResponse::handleChunk nullptr object");
    // No issues it will be empty in case of disabled caching.
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
    return;
  }

  if (chunkType == TcrMessageHelper::ChunkObjectType::OBJECT) {
    LOGDEBUG("ChunkedPutAllResponse::handleChunk object");
    std::recursive_mutex responseLock;
    auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>(
        dynamic_cast<ThinClientRegion*>(m_region.get()),
        m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock);
    vcObjPart->fromData(input);
    m_list->addAll(vcObjPart);
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
  } else {
    LOGDEBUG("ChunkedPutAllResponse::handleChunk BYTES PART");
    const auto byte0 = input.read();
    LOGDEBUG("ChunkedPutAllResponse::handleChunk single-hop bytes byte0 = %d ",
             byte0);
    const auto byte1 = input.read();
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);

    auto pool = m_msg.getPool();
    if (pool != nullptr && !pool->isDestroyed() &&
        pool->getPRSingleHopEnabled()) {
      auto poolDM = dynamic_cast<ThinClientPoolDM*>(pool);
      if ((poolDM != nullptr) &&
          (poolDM->getClientMetaDataService() != nullptr) && (byte0 != 0)) {
        LOGFINE("enqueued region " + m_region->getFullPath() +
                " for metadata refresh for singlehop for PUTALL operation.");
        poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
            m_region->getFullPath(), byte1);
      }
    }
  }
}

void ChunkedRemoveAllResponse::reset() {
  if (m_list != nullptr && m_list->size() > 0) {
    m_list->getVersionedTagptr().clear();
  }
}

// process a REMOVE_ALL response chunk
void ChunkedRemoveAllResponse::handleChunk(const uint8_t* chunk,
                                           int32_t chunkLen,
                                           uint8_t isLastChunkWithSecurity,
                                           const CacheImpl* cacheImpl) {
  auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool());

  uint32_t partLen;
  TcrMessageHelper::ChunkObjectType chunkType;
  if ((chunkType = TcrMessageHelper::readChunkPartHeader(
           m_msg, input, DSCode::FixedIDByte,
           static_cast<int32_t>(DSFid::VersionedObjectPartList),
           "ChunkedRemoveAllResponse", partLen, isLastChunkWithSecurity)) ==
      TcrMessageHelper::ChunkObjectType::NULL_OBJECT) {
    LOGDEBUG("ChunkedRemoveAllResponse::handleChunk nullptr object");
    // No issues it will be empty in case of disabled caching.
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
    return;
  }

  if (chunkType == TcrMessageHelper::ChunkObjectType::OBJECT) {
    LOGDEBUG("ChunkedRemoveAllResponse::handleChunk object");
    std::recursive_mutex responseLock;
    auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>(
        dynamic_cast<ThinClientRegion*>(m_region.get()),
        m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock);
    vcObjPart->fromData(input);
    m_list->addAll(vcObjPart);
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
  } else {
    LOGDEBUG("ChunkedRemoveAllResponse::handleChunk BYTES PART");
    const auto byte0 = input.read();
    LOGDEBUG(
        "ChunkedRemoveAllResponse::handleChunk single-hop bytes byte0 = %d ",
        byte0);
    const auto byte1 = input.read();
    m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);

    auto pool = m_msg.getPool();
    if (pool != nullptr && !pool->isDestroyed() &&
        pool->getPRSingleHopEnabled()) {
      auto poolDM = dynamic_cast<ThinClientPoolDM*>(pool);
      if ((poolDM != nullptr) &&
          (poolDM->getClientMetaDataService() != nullptr) && (byte0 != 0)) {
        LOGFINE("enqueued region " + m_region->getFullPath() +
                " for metadata refresh for singlehop for REMOVEALL operation.");
        poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
            m_region->getFullPath(), byte1);
      }
    }
  }
}

void ChunkedDurableCQListResponse::reset() {
  if (m_resultList) {
    m_resultList->clear();
  }
}

// handles the chunk response for GETDURABLECQS_MSG_TYPE
void ChunkedDurableCQListResponse::handleChunk(const uint8_t* chunk,
                                               int32_t chunkLen, uint8_t,
                                               const CacheImpl* cacheImpl) {
  auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool());

  // read and ignore part length
  input.readInt32();
  if (!input.readBoolean()) {
    // we're currently always expecting an object
    char exMsg[256];
    std::snprintf(
        exMsg, 255,
        "ChunkedDurableCQListResponse::handleChunk: part is not object");
    throw MessageException(exMsg);
  }

  input.advanceCursor(1);  // skip the CacheableArrayList type ID byte

  const auto stringParts = input.read();  // read the number of strings in the
                                          // message this is one byte

  for (int i = 0; i < stringParts; i++) {
    m_resultList->push_back(
        std::dynamic_pointer_cast<CacheableString>(input.readObject()));
  }
}

}  // namespace client
}  // namespace geode
}  // namespace apache
