blob: aa26bb60916c10b52c293bd1e64f49c4864cc79a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Utils.hpp"
#include "ThinClientRegion.hpp"
#include "TcrDistributionManager.hpp"
#include "ThinClientPoolDM.hpp"
#include "ThinClientBaseDM.hpp"
#include "TcrEndpoint.hpp"
#include <gfcpp/SystemProperties.hpp>
#include "CacheImpl.hpp"
#include "RegionGlobalLocks.hpp"
#include "ReadWriteLock.hpp"
#include "RemoteQuery.hpp"
#include <gfcpp/SelectResultsIterator.hpp>
#include <gfcpp/Struct.hpp>
#include "GeodeTypeIdsImpl.hpp"
#include "AutoDelete.hpp"
#include <gfcpp/PoolManager.hpp>
#include "UserAttributes.hpp"
#include <gfcpp/UserFunctionExecutionException.hpp>
#include "PutAllPartialResultServerException.hpp"
#include "VersionedCacheableObjectPartList.hpp"
//#include "PutAllPartialResult.hpp"
using namespace apache::geode::client;
namespace apache {
namespace geode {
namespace client {
void setTSSExceptionMessage(const char* exMsg);
} // namespace client
} // namespace geode
} // namespace apache
class PutAllWork : public PooledWork<GfErrType>,
private NonCopyable,
private NonAssignable {
ThinClientPoolDM* m_poolDM;
BucketServerLocationPtr m_serverLocation;
TcrMessage* m_request;
TcrMessageReply* m_reply;
MapOfUpdateCounters m_mapOfUpdateCounters;
bool m_attemptFailover;
bool m_isBGThread;
UserAttributesPtr m_userAttribute;
const RegionPtr m_region;
VectorOfCacheableKeyPtr m_keys;
HashMapOfCacheablePtr m_map;
VersionedCacheableObjectPartListPtr m_verObjPartListPtr;
uint32_t m_timeout;
PutAllPartialResultServerExceptionPtr m_papException;
bool m_isPapeReceived;
ChunkedPutAllResponse* m_resultCollector;
// UNUSED const UserDataPtr& m_aCallbackArgument;
public:
PutAllWork(ThinClientPoolDM* poolDM,
const BucketServerLocationPtr& serverLocation,
const RegionPtr& region, bool attemptFailover, bool isBGThread,
const HashMapOfCacheablePtr map,
const VectorOfCacheableKeyPtr keys, uint32_t timeout,
const UserDataPtr& aCallbackArgument)
: m_poolDM(poolDM),
m_serverLocation(serverLocation),
m_attemptFailover(attemptFailover),
m_isBGThread(isBGThread),
m_userAttribute(NULLPTR),
m_region(region),
m_keys(keys),
m_map(map),
m_timeout(timeout),
m_papException(NULLPTR),
m_isPapeReceived(false)
// UNUSED , m_aCallbackArgument(aCallbackArgument)
{
m_request = new TcrMessagePutAll(m_region.ptr(), *m_map.ptr(),
static_cast<int>(m_timeout * 1000),
m_poolDM, aCallbackArgument);
m_reply = new TcrMessageReply(true, m_poolDM);
// create new instanceof VCOPL
ACE_Recursive_Thread_Mutex responseLock;
m_verObjPartListPtr =
new VersionedCacheableObjectPartList(keys.ptr(), responseLock);
if (m_poolDM->isMultiUserMode()) {
m_userAttribute = TSSUserAttributesWrapper::s_geodeTSSUserAttributes
->getUserAttributes();
}
m_request->setTimeout(m_timeout);
m_reply->setTimeout(m_timeout);
m_resultCollector = new ChunkedPutAllResponse(
m_region, *m_reply, responseLock, m_verObjPartListPtr);
m_reply->setChunkedResultHandler(m_resultCollector);
}
~PutAllWork() {
delete m_request;
delete m_reply;
delete m_resultCollector;
}
TcrMessage* getReply() { return m_reply; }
HashMapOfCacheablePtr getPutAllMap() { return m_map; }
VersionedCacheableObjectPartListPtr getVerObjPartList() {
return m_verObjPartListPtr;
}
ChunkedPutAllResponse* getResultCollector() { return m_resultCollector; }
BucketServerLocationPtr getServerLocation() { return m_serverLocation; }
PutAllPartialResultServerExceptionPtr getPaPResultException() {
return m_papException;
}
void init() {}
GfErrType execute(void) {
GuardUserAttribures gua;
if (m_userAttribute != NULLPTR) {
gua.setProxyCache(m_userAttribute->getProxyCache());
}
GfErrType err = GF_NOERR;
err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover,
m_isBGThread, m_serverLocation);
// Set Version Tags
LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ",
m_resultCollector->getList()->size(), err);
m_verObjPartListPtr->setVersionedTagptr(
m_resultCollector->getList()->getVersionedTagptr());
if (err != GF_NOERR) {
return err;
} /*This can be GF_NOTCON, counterpart to java
ServerConnectivityException*/
switch (m_reply->getMessageType()) {
case TcrMessage::REPLY:
break;
case TcrMessage::RESPONSE:
LOGDEBUG("PutAllwork execute err = %d ", err);
break;
case TcrMessage::EXCEPTION:
// TODO::Check for the PAPException and READ
// PutAllPartialResultServerException and set its member for later use.
// set m_papException and m_isPapeReceived
m_isPapeReceived = true;
if (m_poolDM->isNotAuthorizedException(m_reply->getException())) {
LOGDEBUG("received NotAuthorizedException");
err = GF_AUTHENTICATION_FAILED_EXCEPTION;
} else if (m_poolDM->isPutAllPartialResultException(
m_reply->getException())) {
LOGDEBUG("received PutAllPartialResultException");
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
} else {
LOGDEBUG("received unknown exception:%s", m_reply->getException());
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
// TODO should assign a new err code
}
break;
case TcrMessage::PUT_DATA_ERROR:
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region put-all",
m_reply->getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
};
class RemoveAllWork : public PooledWork<GfErrType>,
private NonCopyable,
private NonAssignable {
ThinClientPoolDM* m_poolDM;
BucketServerLocationPtr m_serverLocation;
TcrMessage* m_request;
TcrMessageReply* m_reply;
MapOfUpdateCounters m_mapOfUpdateCounters;
bool m_attemptFailover;
bool m_isBGThread;
UserAttributesPtr m_userAttribute;
const RegionPtr m_region;
const UserDataPtr& m_aCallbackArgument;
VectorOfCacheableKeyPtr m_keys;
VersionedCacheableObjectPartListPtr m_verObjPartListPtr;
PutAllPartialResultServerExceptionPtr m_papException;
bool m_isPapeReceived;
ChunkedRemoveAllResponse* m_resultCollector;
public:
RemoveAllWork(ThinClientPoolDM* poolDM,
const BucketServerLocationPtr& serverLocation,
const RegionPtr& region, bool attemptFailover, bool isBGThread,
const VectorOfCacheableKeyPtr keys,
const UserDataPtr& aCallbackArgument)
: m_poolDM(poolDM),
m_serverLocation(serverLocation),
m_attemptFailover(attemptFailover),
m_isBGThread(isBGThread),
m_userAttribute(NULLPTR),
m_region(region),
m_aCallbackArgument(aCallbackArgument),
m_keys(keys),
m_papException(NULLPTR),
m_isPapeReceived(false) {
m_request = new TcrMessageRemoveAll(m_region.ptr(), *keys,
m_aCallbackArgument, m_poolDM);
m_reply = new TcrMessageReply(true, m_poolDM);
// create new instanceof VCOPL
ACE_Recursive_Thread_Mutex responseLock;
m_verObjPartListPtr =
new VersionedCacheableObjectPartList(keys.ptr(), responseLock);
if (m_poolDM->isMultiUserMode()) {
m_userAttribute = TSSUserAttributesWrapper::s_geodeTSSUserAttributes
->getUserAttributes();
}
m_resultCollector = new ChunkedRemoveAllResponse(
m_region, *m_reply, responseLock, m_verObjPartListPtr);
m_reply->setChunkedResultHandler(m_resultCollector);
}
~RemoveAllWork() {
delete m_request;
delete m_reply;
delete m_resultCollector;
}
TcrMessage* getReply() { return m_reply; }
VersionedCacheableObjectPartListPtr getVerObjPartList() {
return m_verObjPartListPtr;
}
ChunkedRemoveAllResponse* getResultCollector() { return m_resultCollector; }
BucketServerLocationPtr getServerLocation() { return m_serverLocation; }
PutAllPartialResultServerExceptionPtr getPaPResultException() {
return m_papException;
}
void init() {}
GfErrType execute(void) {
GuardUserAttribures gua;
if (m_userAttribute != NULLPTR) {
gua.setProxyCache(m_userAttribute->getProxyCache());
}
GfErrType err = GF_NOERR;
err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover,
m_isBGThread, m_serverLocation);
// Set Version Tags
LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ",
m_resultCollector->getList()->size(), err);
m_verObjPartListPtr->setVersionedTagptr(
m_resultCollector->getList()->getVersionedTagptr());
if (err != GF_NOERR) {
return err;
} /*This can be GF_NOTCON, counterpart to java
ServerConnectivityException*/
switch (m_reply->getMessageType()) {
case TcrMessage::REPLY:
break;
case TcrMessage::RESPONSE:
LOGDEBUG("RemoveAllWork execute err = %d ", err);
break;
case TcrMessage::EXCEPTION:
// TODO::Check for the PAPException and READ
// PutAllPartialResultServerException and set its member for later use.
// set m_papException and m_isPapeReceived
m_isPapeReceived = true;
if (m_poolDM->isNotAuthorizedException(m_reply->getException())) {
LOGDEBUG("received NotAuthorizedException");
err = GF_AUTHENTICATION_FAILED_EXCEPTION;
} else if (m_poolDM->isPutAllPartialResultException(
m_reply->getException())) {
LOGDEBUG("received PutAllPartialResultException");
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
} else {
LOGDEBUG("received unknown exception:%s", m_reply->getException());
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
// TODO should assign a new err code
}
break;
case TcrMessage::PUT_DATA_ERROR:
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region remove-all",
m_reply->getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
};
ThinClientRegion::ThinClientRegion(const std::string& name, CacheImpl* cache,
RegionInternal* rPtr,
const RegionAttributesPtr& attributes,
const CacheStatisticsPtr& stats, bool shared)
: LocalRegion(name, cache, rPtr, attributes, stats, shared),
m_tcrdm((ThinClientBaseDM*)0),
m_notifyRelease(false),
m_notificationSema(1),
m_isMetaDataRefreshed(false) {
m_transactionEnabled = true;
m_isDurableClnt =
strlen(DistributedSystem::getSystemProperties()->durableClientId()) > 0;
}
void ThinClientRegion::initTCR() {
bool subscription = false;
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
subscription = pool->getSubscriptionEnabled();
}
bool notificationEnabled =
getAttributes()->getClientNotificationEnabled() || subscription;
if (notificationEnabled) {
if (DistributedSystem::getSystemProperties()->isGridClient()) {
LOGWARN(
"Region %s: client subscription channel enabled for a grid "
"client; starting required internal subscription, cleanup and "
"failover threads",
m_fullPath.c_str());
m_cacheImpl->tcrConnectionManager().startFailoverAndCleanupThreads();
}
}
try {
m_tcrdm =
new TcrDistributionManager(this, m_cacheImpl->tcrConnectionManager());
m_tcrdm->init();
} catch (const Exception& ex) {
GF_SAFE_DELETE(m_tcrdm);
LOGERROR("Exception while initializing region: %s: %s", ex.getName(),
ex.getMessage());
throw;
}
}
void ThinClientRegion::registerKeys(const VectorOfCacheableKey& keys,
bool isDurable, bool getInitialValues,
bool receiveValues) {
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Registering keys is supported "
"only if subscription-enabled attribute is true for pool %s",
pool->getName());
throw UnsupportedOperationException(
"Registering keys is supported "
"only if pool subscription-enabled attribute is true.");
}
}
if (keys.empty()) {
LOGERROR("Register keys list is empty");
throw IllegalArgumentException(
"Register keys "
"keys vector is empty");
}
if (isDurable && !isDurableClient()) {
LOGERROR(
"Register keys durable flag is only applicable for durable clients");
throw IllegalStateException(
"Durable flag only applicable for "
"durable clients");
}
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
if (getInitialValues) interestPolicy = InterestResultPolicy::KEYS_VALUES;
LOGDEBUG("ThinClientRegion::registerKeys : interestpolicy is %d",
interestPolicy.ordinal);
GfErrType err = registerKeysNoThrow(keys, true, NULL, isDurable,
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
GfErrTypeToException("Region::registerKeys", err);
}
GfErrTypeToException("Region::registerKeys", err);
}
void ThinClientRegion::unregisterKeys(const VectorOfCacheableKey& keys) {
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Unregister keys is supported "
"only if subscription-enabled attribute is true for pool %s",
pool->getName());
throw UnsupportedOperationException(
"Unregister keys is supported "
"only if pool subscription-enabled attribute is true.");
}
} else {
if (!getAttributes()->getClientNotificationEnabled()) {
LOGERROR(
"Unregister keys is supported "
"only if region client-notification attribute is true.");
throw UnsupportedOperationException(
"Unregister keys is supported "
"only if region client-notification attribute is true.");
}
}
if (keys.empty()) {
LOGERROR("Unregister keys list is empty");
throw IllegalArgumentException(
"Unregister keys "
"keys vector is empty");
}
GfErrType err = unregisterKeysNoThrow(keys);
GfErrTypeToException("Region::unregisterKeys", err);
}
void ThinClientRegion::registerAllKeys(bool isDurable,
VectorOfCacheableKeyPtr resultKeys,
bool getInitialValues,
bool receiveValues) {
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Register all keys is supported only "
"if subscription-enabled attribute is true for pool ",
pool->getName());
throw UnsupportedOperationException(
"Register all keys is supported only "
"if pool subscription-enabled attribute is true.");
}
}
if (isDurable && !isDurableClient()) {
LOGERROR(
"Register all keys durable flag is only applicable for durable "
"clients");
throw IllegalStateException(
"Durable flag only applicable for durable clients");
}
bool isresultKeys = true;
if (resultKeys == NULLPTR) {
resultKeys = VectorOfCacheableKeyPtr(new VectorOfCacheableKey());
isresultKeys = false;
}
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
if (getInitialValues) {
interestPolicy = InterestResultPolicy::KEYS_VALUES;
} else {
interestPolicy = InterestResultPolicy::KEYS;
}
LOGDEBUG("ThinClientRegion::registerAllKeys : interestpolicy is %d",
interestPolicy.ordinal);
// if we need to fetch initial data, then we get the keys in
// that call itself using the special GET_ALL message and do not need
// to get the keys in the initial register interest call
GfErrType err = registerRegexNoThrow(".*", true, NULL, isDurable, resultKeys,
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
GfErrTypeToException("Region::registerAllKeys", err);
}
// Get the entries from the server using a special GET_ALL message
if (isresultKeys == false) {
resultKeys = NULLPTR;
}
GfErrTypeToException("Region::registerAllKeys", err);
}
void ThinClientRegion::registerRegex(const char* regex, bool isDurable,
VectorOfCacheableKeyPtr resultKeys,
bool getInitialValues,
bool receiveValues) {
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Register regex is supported only if "
"subscription-enabled attribute is true for pool ",
pool->getName());
throw UnsupportedOperationException(
"Register regex is supported only if "
"pool subscription-enabled attribute is true.");
}
}
if (isDurable && !isDurableClient()) {
LOGERROR("Register regex durable flag only applicable for durable clients");
throw IllegalStateException(
"Durable flag only applicable for durable clients");
}
if (regex == NULL || regex[0] == '\0') {
throw IllegalArgumentException(
"Region::registerRegex: Regex string is empty");
}
std::string sregex = regex;
// bool allKeys = (sregex == ".*");
bool isresultKeys = true;
// if we need initial values then use resultKeys to get the keys from server
if (resultKeys == NULLPTR) {
resultKeys = new VectorOfCacheableKey();
isresultKeys = false;
}
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
if (getInitialValues) {
interestPolicy = InterestResultPolicy::KEYS_VALUES;
} else {
interestPolicy = InterestResultPolicy::KEYS;
}
LOGDEBUG("ThinClientRegion::registerRegex : interestpolicy is %d",
interestPolicy.ordinal);
// if we need to fetch initial data for "allKeys" case, then we
// get the keys in that call itself using the special GET_ALL message and
// do not need to get the keys in the initial register interest call
GfErrType err = registerRegexNoThrow(
sregex, true, NULL, isDurable, resultKeys, interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
GfErrTypeToException("Region::registerRegex", err);
}
if (isresultKeys == false) {
resultKeys = NULLPTR;
}
GfErrTypeToException("Region::registerRegex", err);
}
void ThinClientRegion::unregisterRegex(const char* regex) {
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Unregister regex is supported only if "
"subscription-enabled attribute is true for pool ",
pool->getName());
throw UnsupportedOperationException(
"Unregister regex is supported only if "
"pool subscription-enabled attribute is true.");
}
}
if (regex == NULL || regex[0] == '\0') {
LOGERROR("Unregister regex string is empty");
throw IllegalArgumentException("Unregister regex string is empty");
}
std::string sregex = regex;
GfErrType err = unregisterRegexNoThrow(sregex);
GfErrTypeToException("Region::unregisterRegex", err);
}
void ThinClientRegion::unregisterAllKeys() {
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Unregister all keys is supported only if "
"subscription-enabled attribute is true for pool ",
pool->getName());
throw UnsupportedOperationException(
"Unregister all keys is supported only if "
"pool subscription-enabled attribute is true.");
}
}
GfErrType err = unregisterRegexNoThrow(".*");
GfErrTypeToException("Region::unregisterAllKeys", err);
}
SelectResultsPtr ThinClientRegion::query(const char* predicate,
uint32_t timeout) {
CHECK_DESTROY_PENDING(TryReadGuard, Region::query);
if (predicate == NULL || predicate[0] == '\0') {
LOGERROR("Region query predicate string is empty");
throw IllegalArgumentException("Region query predicate string is empty");
}
bool isFullQuery = false;
size_t predlen = ACE_OS::strlen(predicate);
if (predlen > 6) // perhaps it has 'select' or 'import' if its > 6
{
int skipspace = 0;
while (ACE_OS::ace_isspace(predicate[skipspace])) {
skipspace++;
}
if (predlen - skipspace > 6) // check remaining length again to avoid
// reading past predicate char array
{
char firstWord[7] = {0};
for (int charpos = 0; charpos < 6; charpos++) {
firstWord[charpos] =
ACE_OS::ace_tolower(predicate[charpos + skipspace]);
}
if (!ACE_OS::strcmp(firstWord, "select") ||
!ACE_OS::strcmp(firstWord, "import")) {
isFullQuery = true;
}
}
}
std::string squery;
if (isFullQuery) {
squery = predicate;
} else {
squery = "select distinct * from ";
squery += getFullPath();
squery += " this where ";
squery += predicate;
}
RemoteQueryPtr queryPtr;
// TODO:
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
if (poolDM) {
queryPtr = dynCast<RemoteQueryPtr>(
poolDM->getQueryServiceWithoutCheck()->newQuery(squery.c_str()));
} else {
queryPtr = dynCast<RemoteQueryPtr>(
m_cacheImpl->getQueryService()->newQuery(squery.c_str()));
}
return queryPtr->execute(timeout, "Region::query", m_tcrdm, NULLPTR);
}
bool ThinClientRegion::existsValue(const char* predicate, uint32_t timeout) {
SelectResultsPtr results = query(predicate, timeout);
if (results == NULLPTR) {
return false;
}
return results->size() > 0;
}
GfErrType ThinClientRegion::unregisterKeysBeforeDestroyRegion() {
PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
if (pool != NULLPTR) {
if (!pool->getSubscriptionEnabled()) {
LOGDEBUG(
"pool subscription-enabled attribute is false, No need to Unregister "
"keys");
return GF_NOERR;
}
}
GfErrType err = GF_NOERR;
GfErrType opErr = GF_NOERR;
opErr = unregisterStoredRegexLocalDestroy(m_interestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegexLocalDestroy(m_durableInterestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegexLocalDestroy(
m_interestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegexLocalDestroy(
m_durableInterestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVec;
copyInterestList(keysVec, m_interestList);
opErr = unregisterKeysNoThrowLocalDestroy(keysVec, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecDurable;
copyInterestList(keysVecDurable, m_durableInterestList);
opErr = unregisterKeysNoThrowLocalDestroy(keysVecDurable, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecForUpdatesAsInvalidates;
copyInterestList(keysVecForUpdatesAsInvalidates,
m_interestListForUpdatesAsInvalidates);
opErr =
unregisterKeysNoThrowLocalDestroy(keysVecForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecDurableForUpdatesAsInvalidates;
copyInterestList(keysVecDurableForUpdatesAsInvalidates,
m_durableInterestListForUpdatesAsInvalidates);
opErr = unregisterKeysNoThrowLocalDestroy(
keysVecDurableForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
return err;
}
SerializablePtr ThinClientRegion::selectValue(const char* predicate,
uint32_t timeout) {
SelectResultsPtr results = query(predicate, timeout);
if (results == NULLPTR || results->size() == 0) {
return NULLPTR;
}
if (results->size() > 1) {
throw QueryException("selectValue has more than one result");
}
return results->operator[](0);
}
void ThinClientRegion::serverKeys(VectorOfCacheableKey& v) {
CHECK_DESTROY_PENDING(TryReadGuard, Region::serverKeys);
TcrMessageReply reply(true, m_tcrdm);
TcrMessageKeySet request(m_fullPath, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::KEY_SET);
// need to check
ChunkedKeySetResponse* resultCollector(
new ChunkedKeySetResponse(request, v, reply));
reply.setChunkedResultHandler(resultCollector);
GfErrType err = GF_NOERR;
err = m_tcrdm->sendSyncRequest(request, reply);
GfErrTypeToException("Region::serverKeys", err);
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
// keyset result is handled by ChunkedKeySetResponse
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region:serverKeys", reply.getException());
break;
}
case TcrMessage::KEY_SET_DATA_ERROR: {
LOGERROR("Region serverKeys: an error occurred on the server");
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during region serverKeys",
reply.getMessageType());
err = GF_MSG;
break;
}
}
delete resultCollector;
GfErrTypeToException("Region::serverKeys", err);
}
bool ThinClientRegion::containsKeyOnServer(
const CacheableKeyPtr& keyPtr) const {
GfErrType err = GF_NOERR;
bool ret = false;
TXState* txState = getTXState();
if (txState != NULL) {
// if (!txState->isReplay()) {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// txState->recordTXOperation(GF_CONTAINS_KEY,
// getFullPath(),
// keyPtr,
// args);
// }
}
/** @brief Create message and send to bridge server */
TcrMessageContainsKey request(this, keyPtr, static_cast<UserDataPtr>(NULLPTR),
true, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
err = m_tcrdm->sendSyncRequest(request, reply);
// if ( err != GF_NOERR ) return ret;
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE:
ret = reply.getBoolValue();
break;
case TcrMessage::EXCEPTION:
err = handleServerException("Region::containsKeyOnServer:",
reply.getException());
break;
case TcrMessage::REQUEST_DATA_ERROR:
LOGERROR(
"Region::containsKeyOnServer: read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type in Region::containsKeyOnServer %d",
reply.getMessageType());
err = GF_MSG;
break;
}
CacheableBooleanPtr rptr = CacheableBoolean::create(ret);
rptr = handleReplay(err, rptr);
GfErrTypeToException("Region::containsKeyOnServer ", err);
return rptr->value();
}
bool ThinClientRegion::containsValueForKey_remote(
const CacheableKeyPtr& keyPtr) const {
GfErrType err = GF_NOERR;
bool ret = false;
TXState* txState = getTXState();
if (txState != NULL) {
// if (!txState->isReplay()) {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// txState->recordTXOperation(GF_CONTAINS_VALUE_FOR_KEY,
// getFullPath(), keyPtr, args);
// }
}
/** @brief Create message and send to bridge server */
TcrMessageContainsKey request(this, keyPtr, static_cast<UserDataPtr>(NULLPTR),
false, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
err = m_tcrdm->sendSyncRequest(request, reply);
// if ( err != GF_NOERR ) return ret;
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE:
ret = reply.getBoolValue();
break;
case TcrMessage::EXCEPTION:
err = handleServerException("Region::containsValueForKey:",
reply.getException());
break;
case TcrMessage::REQUEST_DATA_ERROR:
LOGERROR(
"Region::containsValueForKey: read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type in Region::containsValueForKey %d",
reply.getMessageType());
err = GF_MSG;
break;
}
CacheableBooleanPtr rptr = CacheableBoolean::create(ret);
rptr = handleReplay(err, rptr);
GfErrTypeToException("Region::containsValueForKey ", err);
return rptr->value();
}
void ThinClientRegion::clear(const UserDataPtr& aCallbackArgument) {
GfErrType err = GF_NOERR;
err = localClearNoThrow(aCallbackArgument, CacheEventFlags::NORMAL);
if (err != GF_NOERR) GfErrTypeToException("Region::clear", err);
/** @brief Create message and send to bridge server */
TcrMessageClearRegion request(this, aCallbackArgument, -1, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) GfErrTypeToException("Region::clear", err);
switch (reply.getMessageType()) {
case TcrMessage::REPLY:
LOGFINE("Region %s clear message sent to server successfully",
m_fullPath.c_str());
break;
case TcrMessage::EXCEPTION:
err = handleServerException("Region::clear:", reply.getException());
break;
case TcrMessage::CLEAR_REGION_DATA_ERROR:
LOGERROR("Region clear read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region clear",
reply.getMessageType());
err = GF_MSG;
break;
}
if (err == GF_NOERR) {
err = invokeCacheListenerForRegionEvent(
aCallbackArgument, CacheEventFlags::NORMAL, AFTER_REGION_CLEAR);
}
GfErrTypeToException("Region::clear", err);
}
GfErrType ThinClientRegion::getNoThrow_remote(
const CacheableKeyPtr& keyPtr, CacheablePtr& valPtr,
const UserDataPtr& aCallbackArgument, VersionTagPtr& versionTag) {
GfErrType err = GF_NOERR;
/** @brief Create message and send to bridge server */
TcrMessageRequest request(this, keyPtr, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
// put the object into local region
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
valPtr = reply.getValue();
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::get", reply.getException());
break;
}
case TcrMessage::REQUEST_DATA_ERROR: {
// LOGERROR("A read error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while getting entry from region",
reply.getMessageType());
err = GF_MSG;
break;
}
}
return err;
}
GfErrType ThinClientRegion::invalidateNoThrow_remote(
const CacheableKeyPtr& keyPtr, const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
GfErrType err = GF_NOERR;
/** @brief Create message and send to bridge server */
TcrMessageInvalidate request(this, keyPtr, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
// put the object into local region
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::get", reply.getException());
break;
}
case TcrMessage::INVALIDATE_ERROR: {
// LOGERROR("A read error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while getting entry from region",
reply.getMessageType());
err = GF_MSG;
break;
}
}
return err;
}
GfErrType ThinClientRegion::putNoThrow_remote(
const CacheableKeyPtr& keyPtr, const CacheablePtr& valuePtr,
const UserDataPtr& aCallbackArgument, VersionTagPtr& versionTag,
bool checkDelta) {
GfErrType err = GF_NOERR;
// do TCR put
// bool delta = valuePtr->hasDelta();
bool delta = false;
const char* conFlationValue =
DistributedSystem::getSystemProperties()->conflateEvents();
if (checkDelta && valuePtr != NULLPTR && conFlationValue != NULL &&
strcmp(conFlationValue, "true") != 0 &&
ThinClientBaseDM::isDeltaEnabledOnServer()) {
Delta* temp = dynamic_cast<Delta*>(valuePtr.ptr());
delta = (temp && temp->hasDelta());
}
TcrMessagePut request(this, keyPtr, valuePtr, aCallbackArgument, delta,
m_tcrdm);
TcrMessageReply* reply = new TcrMessageReply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, *reply);
if (delta) {
m_cacheImpl->m_cacheStats
->incDeltaPut(); // Does not chcek whether success of failure..
if (reply->getMessageType() ==
TcrMessage::PUT_DELTA_ERROR) { // Try without delta
TcrMessagePut request(this, keyPtr, valuePtr, aCallbackArgument, false,
m_tcrdm, false, true);
delete reply;
reply = new TcrMessageReply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, *reply);
}
}
if (err != GF_NOERR) return err;
// put the object into local region
switch (reply->getMessageType()) {
case TcrMessage::REPLY: {
versionTag = reply->getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::put", reply->getException());
break;
}
case TcrMessage::PUT_DATA_ERROR: {
// LOGERROR("A write error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during region put reply",
reply->getMessageType());
err = GF_MSG;
}
}
delete reply;
reply = NULL;
return err;
}
GfErrType ThinClientRegion::createNoThrow_remote(
const CacheableKeyPtr& keyPtr, const CacheablePtr& valuePtr,
const UserDataPtr& aCallbackArgument, VersionTagPtr& versionTag) {
return putNoThrow_remote(keyPtr, valuePtr, aCallbackArgument, versionTag,
false);
}
GfErrType ThinClientRegion::destroyNoThrow_remote(
const CacheableKeyPtr& keyPtr, const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
GfErrType err = GF_NOERR;
// do TCR destroy
TcrMessageDestroy request(this, keyPtr, NULLPTR, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
if (reply.getEntryNotFound() == 1) {
err = GF_CACHE_ENTRY_NOT_FOUND;
} else {
LOGDEBUG("Remote key [%s] is destroyed successfully in region %s",
Utils::getCacheableKeyString(keyPtr)->asChar(),
m_fullPath.c_str());
}
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::destroy", reply.getException());
break;
}
case TcrMessage::DESTROY_DATA_ERROR: {
err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while destroying region entry",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
} // destroyNoThrow_remote()
GfErrType ThinClientRegion::removeNoThrow_remote(
const CacheableKeyPtr& keyPtr, const CacheablePtr& cvalue,
const UserDataPtr& aCallbackArgument, VersionTagPtr& versionTag) {
GfErrType err = GF_NOERR;
// do TCR remove
TcrMessageDestroy request(this, keyPtr, cvalue, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
return err;
}
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
if (reply.getEntryNotFound() == 1) {
err = GF_ENOENT;
} else {
LOGDEBUG("Remote key [%s] is removed successfully in region %s",
Utils::getCacheableKeyString(keyPtr)->asChar(),
m_fullPath.c_str());
}
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::remove", reply.getException());
break;
}
case TcrMessage::DESTROY_DATA_ERROR: {
err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while removing region entry",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
}
GfErrType ThinClientRegion::removeNoThrowEX_remote(
const CacheableKeyPtr& keyPtr, const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
GfErrType err = GF_NOERR;
// do TCR remove
TcrMessageDestroy request(this, keyPtr, NULLPTR, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
return err;
}
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
versionTag = reply.getVersionTag();
if (reply.getEntryNotFound() == 1) {
err = GF_ENOENT;
} else {
LOGDEBUG("Remote key [%s] is removed successfully in region %s",
Utils::getCacheableKeyString(keyPtr)->asChar(),
m_fullPath.c_str());
}
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::removeEx", reply.getException());
break;
}
case TcrMessage::DESTROY_DATA_ERROR: {
err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while removing region entry",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
}
GfErrType ThinClientRegion::getAllNoThrow_remote(
const VectorOfCacheableKey* keys, const HashMapOfCacheablePtr& values,
const HashMapOfExceptionPtr& exceptions,
const VectorOfCacheableKeyPtr& resultKeys, bool addToLocalCache,
const UserDataPtr& aCallbackArgument) {
GfErrType err = GF_NOERR;
MapOfUpdateCounters updateCountMap;
int32_t destroyTracker = 0;
addToLocalCache = addToLocalCache && m_regionAttributes->getCachingEnabled();
if (addToLocalCache && !m_regionAttributes->getConcurrencyChecksEnabled()) {
// start tracking the entries
if (keys == NULL) {
// track all entries with destroy tracking for non-existent entries
destroyTracker = m_entries->addTrackerForAllEntries(updateCountMap, true);
} else {
for (int32_t index = 0; index < keys->size(); ++index) {
CacheablePtr oldValue;
const CacheableKeyPtr& key = keys->operator[](index);
int updateCount =
m_entries->addTrackerForEntry(key, oldValue, true, false, false);
updateCountMap.insert(std::make_pair(key, updateCount));
}
}
}
// create the GET_ALL request
TcrMessageGetAll request(
this, keys, m_tcrdm,
aCallbackArgument); // now we need to initialize later
TcrMessageReply reply(true, m_tcrdm);
ACE_Recursive_Thread_Mutex responseLock;
// need to check
TcrChunkedResult* resultCollector(new ChunkedGetAllResponse(
reply, this, keys, values, exceptions, resultKeys, updateCountMap,
destroyTracker, addToLocalCache, responseLock));
reply.setChunkedResultHandler(resultCollector);
err = m_tcrdm->sendSyncRequest(request, reply);
if (addToLocalCache && !m_regionAttributes->getConcurrencyChecksEnabled()) {
// remove the tracking for remaining keys in case some keys do not have
// values from server in GII
for (MapOfUpdateCounters::const_iterator iter = updateCountMap.begin();
iter != updateCountMap.end(); ++iter) {
if (iter->second >= 0) {
m_entries->removeTrackerForEntry(iter->first);
}
}
// remove tracking for destroys
if (destroyTracker > 0) {
m_entries->removeDestroyTracking();
}
}
delete resultCollector;
if (err != GF_NOERR) {
return err;
}
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
// nothing to be done; put in local region, if required,
// is handled by ChunkedGetAllResponse
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region:getAll", reply.getException());
break;
}
case TcrMessage::GET_ALL_DATA_ERROR: {
LOGERROR("Region get-all: a read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during region get-all",
reply.getMessageType());
err = GF_MSG;
break;
}
}
return err;
}
GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote(
ThinClientPoolDM* tcrdm, const HashMapOfCacheable& map,
VersionedCacheableObjectPartListPtr& versionedObjPartList, uint32_t timeout,
const UserDataPtr& aCallbackArgument) {
LOGDEBUG(" ThinClientRegion::singleHopPutAllNoThrow_remote map size = %d",
map.size());
RegionPtr region(this);
GfErrType error = GF_NOERR;
/*Step-1::
* populate the keys vector from the user Map and pass it to the
* getServerToFilterMap to generate locationMap
* If locationMap is NULL try the old, existing putAll impl that may take
* multiple n/w hops
*/
VectorOfCacheableKey* userKeys = new VectorOfCacheableKey();
for (HashMapOfCacheable::Iterator iter = map.begin(); iter != map.end();
++iter) {
userKeys->push_back(iter.first());
}
// last param in getServerToFilterMap() is false for putAll
// LOGDEBUG("ThinClientRegion::singleHopPutAllNoThrow_remote keys.size() = %d
// ", userKeys->size());
HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* locationMap =
tcrdm->getClientMetaDataService()->getServerToFilterMap(userKeys, region,
true);
if (locationMap == NULL) {
// putAll with multiple hop implementation
LOGDEBUG("locationMap is Null or Empty");
/*
==24194== 7,825,919 (24 direct, 7,825,895 indirect) bytes in 2 blocks are
definitely lost in loss record 598 of 598
==24194== at 0x4007D75: operator new(unsigned int)
(vg_replace_malloc.c:313)
==24194== by 0x43B5C5F:
apache::geode::client::ThinClientRegion::singleHopPutAllNoThrow_remote(apache::geode::client::ThinClientPoolDM*,
apache::geode::client::HashMapOfCacheable const&,
apache::geode::client::SharedPtr<apache::geode::client::VersionedCacheableObjectPartList>&,
unsigned
int) (ThinClientRegion.cpp:1180)
==24194== by 0x43B8B65:
apache::geode::client::ThinClientRegion::putAllNoThrow_remote(apache::geode::client::HashMapOfCacheable
const&,
apache::geode::client::SharedPtr<apache::geode::client::VersionedCacheableObjectPartList>&,
unsigned int) (ThinClientRegion.cpp:1500)
==24194== by 0x42E55F5:
apache::geode::client::LocalRegion::putAllNoThrow(apache::geode::client::HashMapOfCacheable
const&,
unsigned int) (LocalRegion.cpp:1956)
==24194== by 0x42DC797:
apache::geode::client::LocalRegion::putAll(apache::geode::client::HashMapOfCacheable
const&, unsigned
int) (LocalRegion.cpp:366)
==24194== by 0x806FF8D: Task_StepEight::doTask() (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194== by 0x807CE2A: dunit::TestSlave::begin() (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194== by 0x8078F57: dunit::dmain(int, char**) (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194== by 0x805D7EA: main (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194==
*/
delete userKeys;
userKeys = NULL;
return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
aCallbackArgument);
}
/*
==24194== 7,825,919 (24 direct, 7,825,895 indirect) bytes in 2 blocks are
definitely lost in loss record 598 of 598
==24194== at 0x4007D75: operator new(unsigned int)
(vg_replace_malloc.c:313)
==24194== by 0x43B5C5F:
apache::geode::client::ThinClientRegion::singleHopPutAllNoThrow_remote(apache::geode::client::ThinClientPoolDM*,
apache::geode::client::HashMapOfCacheable const&,
apache::geode::client::SharedPtr<apache::geode::client::VersionedCacheableObjectPartList>&,
unsigned int)
(ThinClientRegion.cpp:1180)
==24194== by 0x43B8B65:
apache::geode::client::ThinClientRegion::putAllNoThrow_remote(apache::geode::client::HashMapOfCacheable
const&,
apache::geode::client::SharedPtr<apache::geode::client::VersionedCacheableObjectPartList>&,
unsigned int) (ThinClientRegion.cpp:1500)
==24194== by 0x42E55F5:
apache::geode::client::LocalRegion::putAllNoThrow(apache::geode::client::HashMapOfCacheable
const&,
unsigned int) (LocalRegion.cpp:1956)
==24194== by 0x42DC797:
apache::geode::client::LocalRegion::putAll(apache::geode::client::HashMapOfCacheable
const&, unsigned int)
(LocalRegion.cpp:366)
==24194== by 0x806FF8D: Task_StepEight::doTask() (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194== by 0x807CE2A: dunit::TestSlave::begin() (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194== by 0x8078F57: dunit::dmain(int, char**) (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194== by 0x805D7EA: main (in
/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPutAll)
==24194==
*/
delete userKeys;
userKeys = NULL;
// set this flag that indicates putAll on PR is invoked with singlehop
// enabled.
m_isPRSingleHopEnabled = true;
// LOGDEBUG("locationMap.size() = %d ", locationMap->size());
/*Step-2
* a. create vector of PutAllWork
* b. locationMap<BucketServerLocationPtr, VectorOfCacheableKeyPtr>.
* Create server specific filteredMap/subMap by populating all keys
* (locationIter.second()) and its corr. values from the user Map.
* c. create new instance of PutAllWork, i.e worker with required params.
* //TODO:: Add details of each parameter later
* d. enqueue the worker for thread from threadPool to perform/run execute
* method.
* e. insert the worker into the vector.
*/
std::vector<PutAllWork*> putAllWorkers;
ThreadPool* threadPool = TPSingleton::instance();
int locationMapIndex = 0;
for (HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator
locationIter = locationMap->begin();
locationIter != locationMap->end(); locationIter++) {
BucketServerLocationPtr serverLocation = locationIter.first();
if (serverLocation == NULLPTR) {
LOGDEBUG("serverLocation is NULLPTR");
}
VectorOfCacheableKeyPtr keys = locationIter.second();
// Create server specific Sub-Map by iterating over keys.
HashMapOfCacheablePtr filteredMap(new HashMapOfCacheable());
if (keys != NULLPTR && keys->size() > 0) {
for (int32_t index = 0; index < keys->size(); index++) {
HashMapOfCacheable::Iterator iter = map.find(keys->at(index));
if (iter != map.end()) {
filteredMap->insert(iter.first(), iter.second());
} else {
LOGDEBUG("DEBUG:: KEY not found in user MAP");
}
}
}
// TEST-CODE :: PRINT each sub-Map entries
/*
LOGDEBUG("Printing map at %d locationMapindex ", locationMapIndex);
for (HashMapOfCacheable::Iterator filteredMapIter = filteredMap->begin();
filteredMapIter != filteredMap->end(); ++filteredMapIter) {
CacheableInt32Ptr kPtr =
dynCast<CacheableInt32Ptr>(filteredMapIter.first()) ;
CacheableInt32Ptr vPtr =
dynCast<CacheableInt32Ptr>(filteredMapIter.second());
LOGDEBUG("Key = %d Value = %d ", kPtr->value(), vPtr->value() );
}
*/
PutAllWork* worker = new PutAllWork(
tcrdm, serverLocation, region, true /*attemptFailover*/,
false /*isBGThread*/, filteredMap, keys, timeout, aCallbackArgument);
threadPool->perform(worker);
putAllWorkers.push_back(worker);
locationMapIndex++;
}
// TODO::CHECK, do we need to set following ..??
// reply.setMessageType(TcrMessage::RESPONSE);
int cnt = 1;
/**
* Step::3
* a. Iterate over all vector of putAllWorkers and populate worker specific
* information into the HashMap
* resultMap<BucketServerLocationPtr, SerializablePtr>, 2nd part, Value can
* be a VersionedCacheableObjectPartListPtr or
* PutAllPartialResultServerExceptionPtr.
* failedServers<BucketServerLocationPtr, CacheableInt32Ptr>, 2nd part,
* Value is a ErrorCode.
* b. delete the worker
*/
HashMapT<BucketServerLocationPtr, SerializablePtr>* resultMap =
new HashMapT<BucketServerLocationPtr, SerializablePtr>();
HashMapT<BucketServerLocationPtr, CacheableInt32Ptr>* failedServers =
new HashMapT<BucketServerLocationPtr, CacheableInt32Ptr>();
for (std::vector<PutAllWork*>::iterator iter = putAllWorkers.begin();
iter != putAllWorkers.end(); iter++) {
PutAllWork* worker = *iter;
GfErrType err =
worker->getResult(); // wait() or blocking call for worker thread.
LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);
if (err != GF_NOERR) {
error = err;
if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
resultMap->insert(worker->getServerLocation(),
worker->getPaPResultException());
} else if (error == GF_NOTCON) {
// Refresh the metadata in case of GF_NOTCON.
tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
region->getFullPath(), 0);
}
failedServers->insert(worker->getServerLocation(),
CacheableInt32::create(error));
} else {
// No Exception from server
resultMap->insert(worker->getServerLocation(),
worker->getResultCollector()->getList());
}
LOGDEBUG("worker->getPutAllMap()->size() = %d ",
worker->getPutAllMap()->size());
LOGDEBUG(
"worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
worker->getResultCollector()->getList()->getVersionedTagsize());
// TODO::CHECK, why do we need following code... ??
// TcrMessage* currentReply = worker->getReply();
/*
if(currentReply->getMessageType() != TcrMessage::REPLY)
{
reply.setMessageType(currentReply->getMessageType());
}
*/
delete worker;
cnt++;
}
/**
* Step:4
* a. create instance of PutAllPartialResultPtr with total size= map.size()
* b. Iterate over the resultMap and value for the particular serverlocation
* is of type VersionedCacheableObjectPartList add keys and versions.
* C. ToDO:: what if the value in the resultMap is of type
* PutAllPartialResultServerException
*/
ACE_Recursive_Thread_Mutex responseLock;
PutAllPartialResultPtr result(
new PutAllPartialResult(map.size(), responseLock));
LOGDEBUG(
" TCRegion:: %s:%d "
"result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
__FILE__, __LINE__,
result->getSucceededKeysAndVersions()->getVersionedTagsize());
LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
resultMap->size());
for (HashMapT<BucketServerLocationPtr, SerializablePtr>::Iterator
resultMapIter = resultMap->begin();
resultMapIter != resultMap->end(); resultMapIter++) {
SerializablePtr value = resultMapIter.second();
PutAllPartialResultServerException* papException = NULL;
VersionedCacheableObjectPartListPtr list = NULLPTR;
papException =
dynamic_cast<PutAllPartialResultServerException*>(value.ptr());
// LOGDEBUG(" TCRegion:: %s:%d
// result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
// __FILE__, __LINE__,
// result->getSucceededKeysAndVersions()->getVersionedTagsize());
if (papException != NULL) {
// PutAllPartialResultServerException CASE:: value in resultMap is of type
// PutAllPartialResultServerException.
// TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
// keys in map failedServers,
// that is set view of the keys contained in failedservers map.
// LOGDEBUG("TCRegion:: PutAll SingleHop encountered
// PutAllPartialResultServerException exception: %s , failedServers are:
// ", papException->getMessage()->asChar());
// TODO:: need to read papException and populate PutAllPartialResult.
result->consolidate(papException->getResult());
} else if (value != NULLPTR &&
(list = dynCast<VersionedCacheableObjectPartListPtr>(value)) !=
NULLPTR) {
// value in resultMap is of type VCOPL.
// LOGDEBUG("TCRegion:: %s:%d :: list->getSucceededKeys()->size()=%d
// list->getVersionedTagsize() = %d", __FILE__, __LINE__,
// list->getSucceededKeys()->size(), list->getVersionedTagsize());
result->addKeysAndVersions(list);
} else {
// ERROR CASE
if (value != NULLPTR) {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value "
"could not Cast to either VCOPL or "
"PutAllPartialResultServerException:%s",
value->toString()->asChar());
} else {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value is "
"NULL");
}
}
}
/**
* a. if PutAllPartialResult result does not contains any entry, Iterate over
* locationMap.
* b. Create VectorOfCacheableKey succeedKeySet, and keep adding set of keys
* (locationIter.second()) in locationMap for which
* failedServers->contains(locationIter.first()is false.
*/
LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %d", __FILE__,
__LINE__, failedServers->size());
// if the partial result set doesn't already have keys (for tracking version
// tags)
// then we need to gather up the keys that we know have succeeded so far and
// add them to the partial result set (See bug Id #955)
if (failedServers->size() > 0) {
VectorOfCacheableKeyPtr succeedKeySet(new VectorOfCacheableKey());
if (result->getSucceededKeysAndVersions()->size() == 0) {
for (HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator
locationIter = locationMap->begin();
locationIter != locationMap->end(); locationIter++) {
if (!failedServers->contains(locationIter.first())) {
for (int32_t i = 0; i < locationIter.second()->size(); i++) {
succeedKeySet->push_back(locationIter.second()->at(i));
}
}
}
result->addKeys(succeedKeySet);
}
}
/**
* a. Iterate over the failedServers map
* c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
* continue, Do not retry putAll for corr. keys.
* b. Retry for all the failed server.
* Generate a newSubMap by finding Keys specific to failedServers from
* locationMap and finding their respective values from the usermap.
*/
error = GF_NOERR;
bool oneSubMapRetryFailed = false;
for (HashMapT<BucketServerLocationPtr, CacheableInt32Ptr>::Iterator
failedServerIter = failedServers->begin();
failedServerIter != failedServers->end(); failedServerIter++) {
if (failedServerIter.second()->value() ==
GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation
// will not retry for PutAllPartialResultException
// but it means at least one sub map ever failed
oneSubMapRetryFailed = true;
error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
continue;
}
HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator
failedSerInLocMapIter = locationMap->find(failedServerIter.first());
VectorOfCacheableKeyPtr failedKeys = NULLPTR;
if (failedSerInLocMapIter != locationMap->end()) {
failedKeys = failedSerInLocMapIter.second();
}
if (failedKeys == NULLPTR) {
LOGERROR(
"TCRegion::singleHopPutAllNoThrow_remote :: failedKeys are NULL that "
"is not valid");
}
HashMapOfCacheablePtr newSubMap(new HashMapOfCacheable());
if (failedKeys != NULLPTR && failedKeys->size() > 0) {
for (int32_t index = 0; index < failedKeys->size(); index++) {
HashMapOfCacheable::Iterator iter = map.find(failedKeys->at(index));
if (iter != map.end()) {
newSubMap->insert(iter.first(), iter.second());
} else {
LOGERROR(
"DEBUG:: TCRegion.cpp singleHopPutAllNoThrow_remote KEY not "
"found in user failedSubMap");
}
}
}
VersionedCacheableObjectPartListPtr vcopListPtr;
PutAllPartialResultServerExceptionPtr papResultServerExc = NULLPTR;
GfErrType errCode = multiHopPutAllNoThrow_remote(
*newSubMap.ptr(), vcopListPtr, timeout, aCallbackArgument);
if (errCode == GF_NOERR) {
result->addKeysAndVersions(vcopListPtr);
} else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
oneSubMapRetryFailed = true;
// TODO:: Commented it as papResultServerExc is NULLPTR this time
// UnComment it once you read papResultServerExc.
// result->consolidate(papResultServerExc->getResult());
error = errCode;
} else /*if(errCode != GF_NOERR)*/ {
oneSubMapRetryFailed = true;
CacheableKeyPtr firstKey = newSubMap->begin().first();
ExceptionPtr excptPtr = NULLPTR;
// TODO:: formulat excptPtr from the errCode
result->saveFailedKey(firstKey, excptPtr);
error = errCode;
}
}
if (!oneSubMapRetryFailed) {
error = GF_NOERR;
}
versionedObjPartList = result->getSucceededKeysAndVersions();
LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
versionedObjPartList->size(), error);
delete locationMap;
delete failedServers;
delete resultMap;
return error;
}
GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote(
const HashMapOfCacheable& map,
VersionedCacheableObjectPartListPtr& versionedObjPartList, uint32_t timeout,
const UserDataPtr& aCallbackArgument) {
// Multiple hop implementation
LOGDEBUG("ThinClientRegion::multiHopPutAllNoThrow_remote ");
GfErrType err = GF_NOERR;
// Construct request/reply for putAll
TcrMessagePutAll request(this, map, static_cast<int>(timeout * 1000), m_tcrdm,
aCallbackArgument);
TcrMessageReply reply(true, m_tcrdm);
request.setTimeout(timeout);
reply.setTimeout(timeout);
ACE_Recursive_Thread_Mutex responseLock;
versionedObjPartList = new VersionedCacheableObjectPartList(responseLock);
// need to check
ChunkedPutAllResponse* resultCollector(new ChunkedPutAllResponse(
RegionPtr(this), reply, responseLock, versionedObjPartList));
reply.setChunkedResultHandler(resultCollector);
err = m_tcrdm->sendSyncRequest(request, reply);
versionedObjPartList = resultCollector->getList();
LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d ",
versionedObjPartList->size(), err);
delete resultCollector;
if (err != GF_NOERR) return err;
LOGDEBUG(
"ThinClientRegion::multiHopPutAllNoThrow_remote reply.getMessageType() = "
"%d ",
reply.getMessageType());
switch (reply.getMessageType()) {
case TcrMessage::REPLY:
// LOGDEBUG("Map is written into remote server at region %s",
// m_fullPath.c_str());
break;
case TcrMessage::RESPONSE:
LOGDEBUG(
"multiHopPutAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ",
m_fullPath.c_str(), err);
break;
case TcrMessage::EXCEPTION:
err = handleServerException("ThinClientRegion::putAllNoThrow",
reply.getException());
// TODO:: Do we need to read PutAllPartialServerException for multiple
// hop.
break;
case TcrMessage::PUT_DATA_ERROR:
// LOGERROR( "A write error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint( )->name( ).c_str( ) );
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region put-all",
reply.getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
GfErrType ThinClientRegion::putAllNoThrow_remote(
const HashMapOfCacheable& map,
VersionedCacheableObjectPartListPtr& versionedObjPartList, uint32_t timeout,
const UserDataPtr& aCallbackArgument) {
LOGDEBUG("ThinClientRegion::putAllNoThrow_remote");
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
TXState* txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
if (poolDM != NULL) {
if (poolDM->getPRSingleHopEnabled() &&
poolDM->getClientMetaDataService() != NULL &&
txState == NULL /*For Tx use multi-hop*/) {
return singleHopPutAllNoThrow_remote(poolDM, map, versionedObjPartList,
timeout, aCallbackArgument);
} else {
return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
aCallbackArgument);
}
} else {
LOGERROR("ThinClientRegion::putAllNoThrow_remote :: Pool Not Specified ");
return GF_NOTSUP;
}
}
GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote(
ThinClientPoolDM* tcrdm, const VectorOfCacheableKey& keys,
VersionedCacheableObjectPartListPtr& versionedObjPartList,
const UserDataPtr& aCallbackArgument) {
LOGDEBUG(" ThinClientRegion::singleHopRemoveAllNoThrow_remote keys size = %d",
keys.size());
RegionPtr region(this);
GfErrType error = GF_NOERR;
HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* locationMap =
tcrdm->getClientMetaDataService()->getServerToFilterMap(&keys, region,
true);
if (locationMap == NULL) {
// removeAll with multiple hop implementation
LOGDEBUG("locationMap is Null or Empty");
return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
aCallbackArgument);
}
// set this flag that indicates putAll on PR is invoked with singlehop
// enabled.
m_isPRSingleHopEnabled = true;
LOGDEBUG("locationMap.size() = %d ", locationMap->size());
/*Step-2
* a. create vector of RemoveAllWork
* b. locationMap<BucketServerLocationPtr, VectorOfCacheableKeyPtr>.
* Create server specific filteredMap/subMap by populating all keys
* (locationIter.second()) and its corr. values from the user Map.
* c. create new instance of RemoveAllWork, i.e worker with required params.
* //TODO:: Add details of each parameter later
* d. enqueue the worker for thread from threadPool to perform/run execute
* method.
* e. insert the worker into the vector.
*/
std::vector<RemoveAllWork*> removeAllWorkers;
ThreadPool* threadPool = TPSingleton::instance();
int locationMapIndex = 0;
for (HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator
locationIter = locationMap->begin();
locationIter != locationMap->end(); locationIter++) {
BucketServerLocationPtr serverLocation = locationIter.first();
if (serverLocation == NULLPTR) {
LOGDEBUG("serverLocation is NULLPTR");
}
VectorOfCacheableKeyPtr mappedkeys = locationIter.second();
RemoveAllWork* worker = new RemoveAllWork(
tcrdm, serverLocation, region, true /*attemptFailover*/,
false /*isBGThread*/, mappedkeys, aCallbackArgument);
threadPool->perform(worker);
removeAllWorkers.push_back(worker);
locationMapIndex++;
}
// TODO::CHECK, do we need to set following ..??
// reply.setMessageType(TcrMessage::RESPONSE);
int cnt = 1;
/**
* Step::3
* a. Iterate over all vector of putAllWorkers and populate worker specific
* information into the HashMap
* resultMap<BucketServerLocationPtr, SerializablePtr>, 2nd part, Value can
* be a VersionedCacheableObjectPartListPtr or
* PutAllPartialResultServerExceptionPtr.
* failedServers<BucketServerLocationPtr, CacheableInt32Ptr>, 2nd part,
* Value is a ErrorCode.
* b. delete the worker
*/
HashMapT<BucketServerLocationPtr, SerializablePtr>* resultMap =
new HashMapT<BucketServerLocationPtr, SerializablePtr>();
HashMapT<BucketServerLocationPtr, CacheableInt32Ptr>* failedServers =
new HashMapT<BucketServerLocationPtr, CacheableInt32Ptr>();
for (std::vector<RemoveAllWork*>::iterator iter = removeAllWorkers.begin();
iter != removeAllWorkers.end(); iter++) {
RemoveAllWork* worker = *iter;
GfErrType err =
worker->getResult(); // wait() or blocking call for worker thread.
LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);
if (err != GF_NOERR) {
error = err;
if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
resultMap->insert(worker->getServerLocation(),
worker->getPaPResultException());
} else if (error == GF_NOTCON) {
// Refresh the metadata in case of GF_NOTCON.
tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
region->getFullPath(), 0);
}
failedServers->insert(worker->getServerLocation(),
CacheableInt32::create(error));
} else {
// No Exception from server
resultMap->insert(worker->getServerLocation(),
worker->getResultCollector()->getList());
}
LOGDEBUG(
"worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
worker->getResultCollector()->getList()->getVersionedTagsize());
delete worker;
cnt++;
}
/**
* Step:4
* a. create instance of PutAllPartialResultPtr with total size= map.size()
* b. Iterate over the resultMap and value for the particular serverlocation
* is of type VersionedCacheableObjectPartList add keys and versions.
* C. ToDO:: what if the value in the resultMap is of type
* PutAllPartialResultServerException
*/
ACE_Recursive_Thread_Mutex responseLock;
PutAllPartialResultPtr result(
new PutAllPartialResult(keys.size(), responseLock));
LOGDEBUG(
" TCRegion:: %s:%d "
"result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
__FILE__, __LINE__,
result->getSucceededKeysAndVersions()->getVersionedTagsize());
LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
resultMap->size());
for (HashMapT<BucketServerLocationPtr, SerializablePtr>::Iterator
resultMapIter = resultMap->begin();
resultMapIter != resultMap->end(); resultMapIter++) {
SerializablePtr value = resultMapIter.second();
PutAllPartialResultServerException* papException = NULL;
VersionedCacheableObjectPartListPtr list = NULLPTR;
papException =
dynamic_cast<PutAllPartialResultServerException*>(value.ptr());
if (papException != NULL) {
// PutAllPartialResultServerException CASE:: value in resultMap is of type
// PutAllPartialResultServerException.
// TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
// keys in map failedServers,
// that is set view of the keys contained in failedservers map.
// LOGDEBUG("TCRegion:: PutAll SingleHop encountered
// PutAllPartialResultServerException exception: %s , failedServers are:
// ", papException->getMessage()->asChar());
// TODO:: need to read papException and populate PutAllPartialResult.
result->consolidate(papException->getResult());
} else if (value != NULLPTR &&
(list = dynCast<VersionedCacheableObjectPartListPtr>(value)) !=
NULLPTR) {
// value in resultMap is of type VCOPL.
// LOGDEBUG("TCRegion:: %s:%d :: list->getSucceededKeys()->size()=%d
// list->getVersionedTagsize() = %d", __FILE__, __LINE__,
// list->getSucceededKeys()->size(), list->getVersionedTagsize());
result->addKeysAndVersions(list);
} else {
// ERROR CASE
if (value != NULLPTR) {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
"could not Cast to either VCOPL or "
"PutAllPartialResultServerException:%s",
value->toString()->asChar());
} else {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
"is NULL");
}
}
}
/**
* a. if PutAllPartialResult result does not contains any entry, Iterate over
* locationMap.
* b. Create VectorOfCacheableKey succeedKeySet, and keep adding set of keys
* (locationIter.second()) in locationMap for which
* failedServers->contains(locationIter.first()is false.
*/
LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %d", __FILE__,
__LINE__, failedServers->size());
// if the partial result set doesn't already have keys (for tracking version
// tags)
// then we need to gather up the keys that we know have succeeded so far and
// add them to the partial result set (See bug Id #955)
if (failedServers->size() > 0) {
VectorOfCacheableKeyPtr succeedKeySet(new VectorOfCacheableKey());
if (result->getSucceededKeysAndVersions()->size() == 0) {
for (HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator
locationIter = locationMap->begin();
locationIter != locationMap->end(); locationIter++) {
if (!failedServers->contains(locationIter.first())) {
for (int32_t i = 0; i < locationIter.second()->size(); i++) {
succeedKeySet->push_back(locationIter.second()->at(i));
}
}
}
result->addKeys(succeedKeySet);
}
}
/**
* a. Iterate over the failedServers map
* c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
* continue, Do not retry putAll for corr. keys.
* b. Retry for all the failed server.
* Generate a newSubMap by finding Keys specific to failedServers from
* locationMap and finding their respective values from the usermap.
*/
error = GF_NOERR;
bool oneSubMapRetryFailed = false;
for (HashMapT<BucketServerLocationPtr, CacheableInt32Ptr>::Iterator
failedServerIter = failedServers->begin();
failedServerIter != failedServers->end(); failedServerIter++) {
if (failedServerIter.second()->value() ==
GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation
// will not retry for PutAllPartialResultException
// but it means at least one sub map ever failed
oneSubMapRetryFailed = true;
error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
continue;
}
HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator
failedSerInLocMapIter = locationMap->find(failedServerIter.first());
VectorOfCacheableKeyPtr failedKeys = NULLPTR;
if (failedSerInLocMapIter != locationMap->end()) {
failedKeys = failedSerInLocMapIter.second();
}
if (failedKeys == NULLPTR) {
LOGERROR(
"TCRegion::singleHopRemoveAllNoThrow_remote :: failedKeys are NULL "
"that is not valid");
}
VersionedCacheableObjectPartListPtr vcopListPtr;
PutAllPartialResultServerExceptionPtr papResultServerExc = NULLPTR;
GfErrType errCode = multiHopRemoveAllNoThrow_remote(
*failedKeys, vcopListPtr, aCallbackArgument);
if (errCode == GF_NOERR) {
result->addKeysAndVersions(vcopListPtr);
} else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
oneSubMapRetryFailed = true;
error = errCode;
} else /*if(errCode != GF_NOERR)*/ {
oneSubMapRetryFailed = true;
ExceptionPtr excptPtr = NULLPTR;
result->saveFailedKey(failedKeys->at(0), excptPtr);
error = errCode;
}
}
if (!oneSubMapRetryFailed) {
error = GF_NOERR;
}
versionedObjPartList = result->getSucceededKeysAndVersions();
LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
versionedObjPartList->size(), error);
delete locationMap;
delete failedServers;
delete resultMap;
return error;
}
GfErrType ThinClientRegion::multiHopRemoveAllNoThrow_remote(
const VectorOfCacheableKey& keys,
VersionedCacheableObjectPartListPtr& versionedObjPartList,
const UserDataPtr& aCallbackArgument) {
// Multiple hop implementation
LOGDEBUG("ThinClientRegion::multiHopRemoveAllNoThrow_remote ");
GfErrType err = GF_NOERR;
// Construct request/reply for putAll
TcrMessageRemoveAll request(this, keys, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
ACE_Recursive_Thread_Mutex responseLock;
versionedObjPartList = new VersionedCacheableObjectPartList(responseLock);
// need to check
ChunkedRemoveAllResponse* resultCollector(new ChunkedRemoveAllResponse(
RegionPtr(this), reply, responseLock, versionedObjPartList));
reply.setChunkedResultHandler(resultCollector);
err = m_tcrdm->sendSyncRequest(request, reply);
versionedObjPartList = resultCollector->getList();
LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d ",
versionedObjPartList->size(), err);
delete resultCollector;
if (err != GF_NOERR) return err;
LOGDEBUG(
"ThinClientRegion::multiHopRemoveAllNoThrow_remote "
"reply.getMessageType() = %d ",
reply.getMessageType());
switch (reply.getMessageType()) {
case TcrMessage::REPLY:
// LOGDEBUG("Map is written into remote server at region %s",
// m_fullPath.c_str());
break;
case TcrMessage::RESPONSE:
LOGDEBUG(
"multiHopRemoveAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ",
m_fullPath.c_str(), err);
break;
case TcrMessage::EXCEPTION:
err = handleServerException("ThinClientRegion::putAllNoThrow",
reply.getException());
break;
default:
LOGERROR("Unknown message type %d during region remove-all",
reply.getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
GfErrType ThinClientRegion::removeAllNoThrow_remote(
const VectorOfCacheableKey& keys,
VersionedCacheableObjectPartListPtr& versionedObjPartList,
const UserDataPtr& aCallbackArgument) {
LOGDEBUG("ThinClientRegion::removeAllNoThrow_remote");
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
TXState* txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
if (poolDM != NULL) {
if (poolDM->getPRSingleHopEnabled() &&
poolDM->getClientMetaDataService() != NULL &&
txState == NULL /*For Tx use multi-hop*/) {
return singleHopRemoveAllNoThrow_remote(
poolDM, keys, versionedObjPartList, aCallbackArgument);
} else {
return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
aCallbackArgument);
}
} else {
LOGERROR(
"ThinClientRegion::removeAllNoThrow_remote :: Pool Not Specified ");
return GF_NOTSUP;
}
}
uint32_t ThinClientRegion::size_remote() {
LOGDEBUG("ThinClientRegion::size_remote");
GfErrType err = GF_NOERR;
// do TCR size
TcrMessageSize request(m_fullPath.c_str());
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
GfErrTypeToException("Region::size", err);
}
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
CacheableInt32Ptr size = staticCast<CacheableInt32Ptr>(reply.getValue());
return size->value();
// LOGINFO("Map is written into remote server at region %s",
// m_fullPath.c_str());
} break;
case TcrMessage::EXCEPTION:
err =
handleServerException("ThinClientRegion::size", reply.getException());
break;
case TcrMessage::SIZE_ERROR:
// LOGERROR( "A write error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint( )->name( ).c_str( ) );
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region size",
reply.getMessageType());
err = GF_NOTOBJ;
}
GfErrTypeToException("Region::size", err);
return 0;
}
GfErrType ThinClientRegion::registerStoredRegex(
TcrEndpoint* endpoint,
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex,
bool isDurable, bool receiveValues) {
GfErrType opErr = GF_NOERR;
GfErrType retVal = GF_NOERR;
for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
interestListRegex.begin();
it != interestListRegex.end(); ++it) {
opErr = registerRegexNoThrow(it->first, false, endpoint, isDurable, NULLPTR,
it->second, receiveValues);
if (opErr != GF_NOERR) {
retVal = opErr;
}
}
return retVal;
}
GfErrType ThinClientRegion::registerKeys(TcrEndpoint* endpoint,
const TcrMessage* request,
TcrMessageReply* reply) {
GfErrType err = GF_NOERR;
GfErrType opErr = GF_NOERR;
// called when failover to a different server
opErr = registerStoredRegex(endpoint, m_interestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = registerStoredRegex(
endpoint, m_interestListRegexForUpdatesAsInvalidates, false, false);
err = opErr != GF_NOERR ? opErr : err;
opErr = registerStoredRegex(endpoint, m_durableInterestListRegex, true);
err = opErr != GF_NOERR ? opErr : err;
opErr = registerStoredRegex(
endpoint, m_durableInterestListRegexForUpdatesAsInvalidates, true, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVec;
InterestResultPolicy interestPolicy =
copyInterestList(keysVec, m_interestList);
opErr = registerKeysNoThrow(keysVec, false, endpoint, false, interestPolicy);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecForUpdatesAsInvalidates;
interestPolicy = copyInterestList(keysVecForUpdatesAsInvalidates,
m_interestListForUpdatesAsInvalidates);
opErr = registerKeysNoThrow(keysVecForUpdatesAsInvalidates, false, endpoint,
false, interestPolicy, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecDurable;
interestPolicy = copyInterestList(keysVecDurable, m_durableInterestList);
opErr = registerKeysNoThrow(keysVecDurable, false, endpoint, true,
interestPolicy);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecDurableForUpdatesAsInvalidates;
interestPolicy =
copyInterestList(keysVecDurableForUpdatesAsInvalidates,
m_durableInterestListForUpdatesAsInvalidates);
opErr = registerKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false,
endpoint, true, interestPolicy, false);
err = opErr != GF_NOERR ? opErr : err;
if (request != NULL && request->getRegionName() == m_fullPath &&
(request->getMessageType() == TcrMessage::REGISTER_INTEREST ||
request->getMessageType() == TcrMessage::REGISTER_INTEREST_LIST)) {
const VectorOfCacheableKey* newKeysVec = request->getKeys();
bool isDurable = request->isDurable();
bool receiveValues = request->receiveValues();
if (newKeysVec == NULL || newKeysVec->empty()) {
const std::string& newRegex = request->getRegex();
if (!newRegex.empty()) {
if (request->getRegionName() != m_fullPath) {
reply = NULL;
}
opErr = registerRegexNoThrow(
newRegex, false, endpoint, isDurable, NULLPTR,
request->getInterestResultPolicy(), receiveValues, reply);
err = opErr != GF_NOERR ? opErr : err;
}
} else {
opErr = registerKeysNoThrow(*newKeysVec, false, endpoint, isDurable,
request->getInterestResultPolicy(),
receiveValues, reply);
err = opErr != GF_NOERR ? opErr : err;
}
}
return err;
}
GfErrType ThinClientRegion::unregisterStoredRegex(
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) {
GfErrType opErr = GF_NOERR;
GfErrType retVal = GF_NOERR;
for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
interestListRegex.begin();
it != interestListRegex.end(); ++it) {
opErr = unregisterRegexNoThrow(it->first, false);
if (opErr != GF_NOERR) {
retVal = opErr;
}
}
return retVal;
}
GfErrType ThinClientRegion::unregisterStoredRegexLocalDestroy(
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) {
GfErrType opErr = GF_NOERR;
GfErrType retVal = GF_NOERR;
for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
interestListRegex.begin();
it != interestListRegex.end(); ++it) {
opErr = unregisterRegexNoThrowLocalDestroy(it->first, false);
if (opErr != GF_NOERR) {
retVal = opErr;
}
}
return retVal;
}
GfErrType ThinClientRegion::unregisterKeys() {
GfErrType err = GF_NOERR;
GfErrType opErr = GF_NOERR;
// called when disconnect from a server
opErr = unregisterStoredRegex(m_interestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegex(m_durableInterestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegex(m_interestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
opErr =
unregisterStoredRegex(m_durableInterestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVec;
copyInterestList(keysVec, m_interestList);
opErr = unregisterKeysNoThrow(keysVec, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecDurable;
copyInterestList(keysVecDurable, m_durableInterestList);
opErr = unregisterKeysNoThrow(keysVecDurable, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecForUpdatesAsInvalidates;
copyInterestList(keysVecForUpdatesAsInvalidates,
m_interestListForUpdatesAsInvalidates);
opErr = unregisterKeysNoThrow(keysVecForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
VectorOfCacheableKey keysVecDurableForUpdatesAsInvalidates;
copyInterestList(keysVecDurableForUpdatesAsInvalidates,
m_durableInterestListForUpdatesAsInvalidates);
opErr = unregisterKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
return err;
}
GfErrType ThinClientRegion::destroyRegionNoThrow_remote(
const UserDataPtr& aCallbackArgument) {
GfErrType err = GF_NOERR;
// do TCR destroyRegion
TcrMessageDestroyRegion request(this, aCallbackArgument, -1, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
// LOGINFO("Region %s at remote is destroyed successfully",
// m_fullPath.c_str());
break;
}
case TcrMessage::EXCEPTION: {
err =
handleServerException("Region::destroyRegion", reply.getException());
break;
}
case TcrMessage::DESTROY_REGION_DATA_ERROR: {
err = GF_CACHE_REGION_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during destroy region",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
}
GfErrType ThinClientRegion::registerKeysNoThrow(
const VectorOfCacheableKey& keys, bool attemptFailover,
TcrEndpoint* endpoint, bool isDurable, InterestResultPolicy interestPolicy,
bool receiveValues, TcrMessageReply* reply) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(m_keysLock);
if (keys.empty()) {
return err;
}
TcrMessageReply replyLocal(true, m_tcrdm);
bool needToCreateRC = true;
if (reply == NULL) {
reply = &replyLocal;
} else {
needToCreateRC = false;
}
LOGDEBUG("ThinClientRegion::registerKeysNoThrow : interestpolicy is %d",
interestPolicy.ordinal);
TcrMessageRegisterInterestList request(
this, keys, isDurable, getAttributes()->getCachingEnabled(),
receiveValues, interestPolicy, m_tcrdm);
ACE_Recursive_Thread_Mutex responseLock;
TcrChunkedResult* resultCollector = NULL;
if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) {
HashMapOfCacheablePtr values(new HashMapOfCacheable());
HashMapOfExceptionPtr exceptions(new HashMapOfException());
MapOfUpdateCounters trackers;
int32_t destroyTracker = 1;
if (needToCreateRC) {
resultCollector = (new ChunkedGetAllResponse(
request, this, &keys, values, exceptions, NULLPTR, trackers,
destroyTracker, true, responseLock));
reply->setChunkedResultHandler(resultCollector);
}
} else {
if (needToCreateRC) {
resultCollector = (new ChunkedInterestResponse(request, NULLPTR, *reply));
reply->setChunkedResultHandler(resultCollector);
}
}
err = m_tcrdm->sendSyncRequestRegisterInterest(
request, *reply, attemptFailover, this, endpoint);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY) {
LOGFINER(
"registerKeysNoThrow - got response from secondary for "
"endpoint %s, ignoring.",
endpoint->name().c_str());
} else if (attemptFailover) {
addKeys(keys, isDurable, receiveValues, interestPolicy);
if (!(interestPolicy.ordinal ==
InterestResultPolicy::KEYS_VALUES.ordinal)) {
localInvalidateForRegisterInterest(keys);
}
}
}
if (needToCreateRC) {
delete resultCollector;
}
return err;
}
GfErrType ThinClientRegion::unregisterKeysNoThrow(
const VectorOfCacheableKey& keys, bool attemptFailover) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(m_keysLock);
TcrMessageReply reply(true, m_tcrdm);
if (keys.empty()) {
return err;
}
if (m_interestList.empty() && m_durableInterestList.empty() &&
m_interestListForUpdatesAsInvalidates.empty() &&
m_durableInterestListForUpdatesAsInvalidates.empty()) {
// did not register any keys before.
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
TcrMessageUnregisterInterestList request(this, keys, false, false, true,
InterestResultPolicy::NONE, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (attemptFailover) {
for (VectorOfCacheableKey::Iterator iter = keys.begin();
iter != keys.end(); ++iter) {
m_interestList.erase(*iter);
m_durableInterestList.erase(*iter);
m_interestListForUpdatesAsInvalidates.erase(*iter);
m_durableInterestListForUpdatesAsInvalidates.erase(*iter);
}
}
}
return err;
}
GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy(
const VectorOfCacheableKey& keys, bool attemptFailover) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
GfErrType err = GF_NOERR;
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(m_keysLock);
TcrMessageReply reply(true, m_tcrdm);
if (keys.empty()) {
return err;
}
if (m_interestList.empty() && m_durableInterestList.empty() &&
m_interestListForUpdatesAsInvalidates.empty() &&
m_durableInterestListForUpdatesAsInvalidates.empty()) {
// did not register any keys before.
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
TcrMessageUnregisterInterestList request(this, keys, false, false, true,
InterestResultPolicy::NONE, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR) {
if (attemptFailover) {
for (VectorOfCacheableKey::Iterator iter = keys.begin();
iter != keys.end(); ++iter) {
m_interestList.erase(*iter);
m_durableInterestList.erase(*iter);
m_interestListForUpdatesAsInvalidates.erase(*iter);
m_durableInterestListForUpdatesAsInvalidates.erase(*iter);
}
}
}
return err;
}
bool ThinClientRegion::isRegexRegistered(
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex,
const std::string& regex, bool allKeys) {
if (interestListRegex.find(".*") != interestListRegex.end() ||
(!allKeys && interestListRegex.find(regex) != interestListRegex.end())) {
return true;
}
return false;
}
GfErrType ThinClientRegion::registerRegexNoThrow(
const std::string& regex, bool attemptFailover, TcrEndpoint* endpoint,
bool isDurable, VectorOfCacheableKeyPtr resultKeys,
InterestResultPolicy interestPolicy, bool receiveValues,
TcrMessageReply* reply) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
bool allKeys = (regex == ".*");
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(m_keysLock);
if (attemptFailover) {
if ((isDurable &&
(isRegexRegistered(m_durableInterestListRegex, regex, allKeys) ||
isRegexRegistered(m_durableInterestListRegexForUpdatesAsInvalidates,
regex, allKeys))) ||
(!isDurable &&
(isRegexRegistered(m_interestListRegex, regex, allKeys) ||
isRegexRegistered(m_interestListRegexForUpdatesAsInvalidates, regex,
allKeys)))) {
return err;
}
}
TcrMessageReply replyLocal(true, m_tcrdm);
ChunkedInterestResponse* resultCollector = NULL;
ChunkedGetAllResponse* getAllResultCollector = NULL;
if (reply != NULL) {
// need to check
resultCollector = dynamic_cast<ChunkedInterestResponse*>(
reply->getChunkedResultHandler());
if (resultCollector != NULL) {
resultKeys = resultCollector->getResultKeys();
} else {
getAllResultCollector = dynamic_cast<ChunkedGetAllResponse*>(
reply->getChunkedResultHandler());
resultKeys = getAllResultCollector->getResultKeys();
}
}
bool isRCCreatedLocally = false;
LOGDEBUG("ThinClientRegion::registerRegexNoThrow : interestpolicy is %d",
interestPolicy.ordinal);
// TODO:
TcrMessageRegisterInterest request(
m_fullPath, regex.c_str(), interestPolicy, isDurable,
getAttributes()->getCachingEnabled(), receiveValues, m_tcrdm);
ACE_Recursive_Thread_Mutex responseLock;
if (reply == NULL) {
reply = &replyLocal;
if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) {
HashMapOfCacheablePtr values(new HashMapOfCacheable());
HashMapOfExceptionPtr exceptions(new HashMapOfException());
MapOfUpdateCounters trackers;
int32_t destroyTracker = 1;
if (resultKeys == NULLPTR) {
resultKeys = VectorOfCacheableKeyPtr(new VectorOfCacheableKey());
}
// need to check
getAllResultCollector = (new ChunkedGetAllResponse(
request, this, NULL, values, exceptions, resultKeys, trackers,
destroyTracker, true, responseLock));
reply->setChunkedResultHandler(getAllResultCollector);
isRCCreatedLocally = true;
} else {
isRCCreatedLocally = true;
// need to check
resultCollector =
new ChunkedInterestResponse(request, resultKeys, replyLocal);
reply->setChunkedResultHandler(resultCollector);
}
}
err = m_tcrdm->sendSyncRequestRegisterInterest(
request, *reply, attemptFailover, this, endpoint);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY) {
LOGFINER(
"registerRegexNoThrow - got response from secondary for "
"endpoint %s, ignoring.",
endpoint->name().c_str());
} else if (attemptFailover) {
addRegex(regex, isDurable, receiveValues, interestPolicy);
if (interestPolicy.ordinal != InterestResultPolicy::KEYS_VALUES.ordinal) {
if (allKeys) {
localInvalidateRegion_internal();
} else {
const VectorOfCacheableKeyPtr& keys =
resultCollector != NULL ? resultCollector->getResultKeys()
: getAllResultCollector->getResultKeys();
if (keys != NULLPTR) {
localInvalidateForRegisterInterest(*keys);
}
}
}
}
}
if (isRCCreatedLocally == true) {
if (resultCollector != NULL) delete resultCollector;
if (getAllResultCollector != NULL) delete getAllResultCollector;
}
return err;
}
GfErrType ThinClientRegion::unregisterRegexNoThrow(const std::string& regex,
bool attemptFailover) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
err = findRegex(regex);
if (err == GF_NOERR) {
TcrMessageReply reply(false, m_tcrdm);
TcrMessageUnregisterInterest request(m_fullPath, regex,
InterestResultPolicy::NONE, false,
false, true, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (attemptFailover) {
clearRegex(regex);
}
}
}
return err;
}
GfErrType ThinClientRegion::findRegex(const std::string& regex) {
GfErrType err = GF_NOERR;
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(m_keysLock);
if (m_interestListRegex.find(regex) == m_interestListRegex.end() &&
m_durableInterestListRegex.find(regex) ==
m_durableInterestListRegex.end() &&
m_interestListRegexForUpdatesAsInvalidates.find(regex) ==
m_interestListRegexForUpdatesAsInvalidates.end() &&
m_durableInterestListRegexForUpdatesAsInvalidates.find(regex) ==
m_durableInterestListRegexForUpdatesAsInvalidates.end()) {
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
} else {
return err;
}
}
void ThinClientRegion::clearRegex(const std::string& regex) {
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(m_keysLock);
m_interestListRegex.erase(regex);
m_durableInterestListRegex.erase(regex);
m_interestListRegexForUpdatesAsInvalidates.erase(regex);
m_durableInterestListRegexForUpdatesAsInvalidates.erase(regex);
}
GfErrType ThinClientRegion::unregisterRegexNoThrowLocalDestroy(
const std::string& regex, bool attemptFailover) {
GfErrType err = GF_NOERR;
err = findRegex(regex);
if (err == GF_NOERR) {
TcrMessageReply reply(false, m_tcrdm);
TcrMessageUnregisterInterest request(m_fullPath, regex,
InterestResultPolicy::NONE, false,
false, true, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR) {
if (attemptFailover) {
clearRegex(regex);
}
}
}
return err;
}
void ThinClientRegion::addKeys(const VectorOfCacheableKey& keys, bool isDurable,
bool receiveValues,
InterestResultPolicy interestpolicy) {
std::unordered_map<CacheableKeyPtr, InterestResultPolicy>& interestList =
isDurable ? (receiveValues ? m_durableInterestList
: m_durableInterestListForUpdatesAsInvalidates)
: (receiveValues ? m_interestList
: m_interestListForUpdatesAsInvalidates);
for (VectorOfCacheableKey::Iterator iter = keys.begin(); iter != keys.end();
++iter) {
interestList.insert(std::pair<CacheableKeyPtr, InterestResultPolicy>(
*iter, interestpolicy));
}
}
void ThinClientRegion::addRegex(const std::string& regex, bool isDurable,
bool receiveValues,
InterestResultPolicy interestpolicy) {
std::unordered_map<CacheableKeyPtr, InterestResultPolicy>& interestList =
isDurable ? (receiveValues ? m_durableInterestList
: m_durableInterestListForUpdatesAsInvalidates)
: (receiveValues ? m_interestList
: m_interestListForUpdatesAsInvalidates);
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex =
isDurable
? (receiveValues ? m_durableInterestListRegex
: m_durableInterestListRegexForUpdatesAsInvalidates)
: (receiveValues ? m_interestListRegex
: m_interestListRegexForUpdatesAsInvalidates);
if (regex == ".*") {
interestListRegex.clear();
interestList.clear();
}
interestListRegex.insert(
std::pair<std::string, InterestResultPolicy>(regex, interestpolicy));
}
void ThinClientRegion::getInterestList(VectorOfCacheableKey& vlist) const {
ThinClientRegion* nthis = const_cast<ThinClientRegion*>(this);
RegionGlobalLocks acquireLocksRedundancy(nthis, false);
RegionGlobalLocks acquireLocksFailover(nthis);
CHECK_DESTROY_PENDING(TryReadGuard, getInterestList);
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(nthis->m_keysLock);
for (std::unordered_map<CacheableKeyPtr, InterestResultPolicy>::iterator itr =
nthis->m_durableInterestList.begin();
itr != nthis->m_durableInterestList.end(); ++itr) {
vlist.push_back(itr->first);
}
for (std::unordered_map<CacheableKeyPtr, InterestResultPolicy>::iterator itr =
nthis->m_interestList.begin();
itr != nthis->m_interestList.end(); ++itr) {
vlist.push_back(itr->first);
}
}
void ThinClientRegion::getInterestListRegex(
VectorOfCacheableString& vregex) const {
ThinClientRegion* nthis = const_cast<ThinClientRegion*>(this);
RegionGlobalLocks acquireLocksRedundancy(nthis, false);
RegionGlobalLocks acquireLocksFailover(nthis);
CHECK_DESTROY_PENDING(TryReadGuard, getInterestListRegex);
ACE_Guard<ACE_Recursive_Thread_Mutex> keysGuard(nthis->m_keysLock);
for (std::unordered_map<std::string, InterestResultPolicy>::iterator itr =
nthis->m_durableInterestListRegex.begin();
itr != nthis->m_durableInterestListRegex.end(); ++itr) {
vregex.push_back(CacheableString::create((*itr).first.c_str()));
}
for (std::unordered_map<std::string, InterestResultPolicy>::iterator itr =
nthis->m_interestListRegex.begin();
itr != nthis->m_interestListRegex.end(); ++itr) {
vregex.push_back(CacheableString::create((*itr).first.c_str()));
}
}
GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
GfErrType err = GF_NOERR;
CacheablePtr oldValue;
switch (msg.getMessageType()) {
case TcrMessage::LOCAL_INVALIDATE: {
LocalRegion::invalidateNoThrow(
msg.getKey(), msg.getCallbackArgument(), -1,
CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL,
msg.getVersionTag());
break;
}
case TcrMessage::LOCAL_DESTROY: {
err = LocalRegion::destroyNoThrow(
msg.getKey(), msg.getCallbackArgument(), -1,
CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL,
msg.getVersionTag());
break;
}
case TcrMessage::CLEAR_REGION: {
LOGDEBUG("remote clear region event for reigon[%s]",
msg.getRegionName().c_str());
err = localClearNoThrow(
NULLPTR, CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL);
break;
}
case TcrMessage::LOCAL_DESTROY_REGION: {
m_notifyRelease = true;
preserveSB();
err = LocalRegion::destroyRegionNoThrow(
msg.getCallbackArgument(), true,
CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL);
break;
}
case TcrMessage::LOCAL_CREATE:
err = LocalRegion::putNoThrow(
msg.getKey(), msg.getValue(), msg.getCallbackArgument(), oldValue, -1,
CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL,
msg.getVersionTag());
break;
case TcrMessage::LOCAL_UPDATE: {
// for update set the NOTIFICATION_UPDATE to trigger the
// afterUpdate event even if the key is not present in local cache
err = LocalRegion::putNoThrow(
msg.getKey(), msg.getValue(), msg.getCallbackArgument(), oldValue, -1,
CacheEventFlags::NOTIFICATION | CacheEventFlags::NOTIFICATION_UPDATE |
CacheEventFlags::LOCAL,
msg.getVersionTag(), msg.getDelta(), msg.getEventId());
break;
}
case TcrMessage::TOMBSTONE_OPERATION:
LocalRegion::tombstoneOperationNoThrow(msg.getTombstoneVersions(),
msg.getTombstoneKeys());
break;
default: {
if (TcrMessage::getAllEPDisMess() == &msg) {
setProcessedMarker(false);
LocalRegion::invokeAfterAllEndPointDisconnected();
} else {
LOGERROR(
"Unknown message type %d in subscription event handler; possible "
"serialization mismatch",
msg.getMessageType());
err = GF_MSG;
}
break;
}
}
// Update EventIdMap to mark event processed, Only for durable client.
// In case of closing, don't send it as listener might not be invoked.
if (!m_destroyPending && (m_isDurableClnt || msg.hasDelta()) &&
TcrMessage::getAllEPDisMess() != &msg) {
m_tcrdm->checkDupAndAdd(msg.getEventId());
}
return err;
}
GfErrType ThinClientRegion::handleServerException(const char* func,
const char* exceptionMsg) {
// LOGERROR("%s: An exception (%s) happened at remote server.", func,
// exceptionMsg);
GfErrType error = GF_NOERR;
setTSSExceptionMessage(exceptionMsg);
if (strstr(exceptionMsg,
"org.apache.geode.security.NotAuthorizedException") != NULL) {
error = GF_NOT_AUTHORIZED_EXCEPTION;
} else if (strstr(exceptionMsg,
"org.apache.geode.cache.CacheWriterException") != NULL) {
error = GF_CACHE_WRITER_EXCEPTION;
} else if (strstr(
exceptionMsg,
"org.apache.geode.security.AuthenticationFailedException") !=
NULL) {
error = GF_AUTHENTICATION_FAILED_EXCEPTION;
} else if (strstr(exceptionMsg,
"org.apache.geode.internal.cache.execute."
"InternalFunctionInvocationTargetException") != NULL) {
error = GF_FUNCTION_EXCEPTION;
} else if (strstr(exceptionMsg,
"org.apache.geode.cache.CommitConflictException") != NULL) {
error = GF_COMMIT_CONFLICT_EXCEPTION;
} else if (strstr(exceptionMsg,
"org.apache.geode.cache."
"TransactionDataNodeHasDepartedException") != NULL) {
error = GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION;
} else if (strstr(
exceptionMsg,
"org.apache.geode.cache.TransactionDataRebalancedException") !=
NULL) {
error = GF_TRANSACTION_DATA_REBALANCED_EXCEPTION;
} else if (strstr(
exceptionMsg,
"org.apache.geode.security.AuthenticationRequiredException") !=
NULL) {
error = GF_AUTHENTICATION_REQUIRED_EXCEPTION;
} else {
error = GF_CACHESERVER_EXCEPTION;
}
if (error != GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
LOGERROR("%s: An exception (%s) happened at remote server.", func,
exceptionMsg);
} else {
LOGFINER("%s: An exception (%s) happened at remote server.", func,
exceptionMsg);
}
return error;
}
void ThinClientRegion::receiveNotification(TcrMessage* msg) {
{
TryReadGuard guard(m_rwLock, m_destroyPending);
if (m_destroyPending) {
if (msg != TcrMessage::getAllEPDisMess()) {
GF_SAFE_DELETE(msg);
}
return;
}
m_notificationSema.acquire();
}
if (msg->getMessageType() == TcrMessage::CLIENT_MARKER) {
handleMarker();
} else {
clientNotificationHandler(*msg);
}
m_notificationSema.release();
if (TcrMessage::getAllEPDisMess() != msg) GF_SAFE_DELETE(msg);
}
void ThinClientRegion::localInvalidateRegion_internal() {
MapEntryImplPtr me;
CacheablePtr oldValue;
VectorOfCacheableKey keysVec;
keys_internal(keysVec);
for (VectorOfCacheableKey::Iterator iter = keysVec.begin();
iter != keysVec.end(); ++iter) {
VersionTagPtr versionTag;
m_entries->invalidate(*iter, me, oldValue, versionTag);
}
}
void ThinClientRegion::invalidateInterestList(
std::unordered_map<CacheableKeyPtr, InterestResultPolicy>& interestList) {
MapEntryImplPtr me;
CacheablePtr oldValue;
if (!m_regionAttributes->getCachingEnabled()) {
return;
}
for (std::unordered_map<CacheableKeyPtr, InterestResultPolicy>::iterator
iter = interestList.begin();
iter != interestList.end(); ++iter) {
VersionTagPtr versionTag;
m_entries->invalidate(iter->first, me, oldValue, versionTag);
}
}
void ThinClientRegion::localInvalidateFailover() {
CHECK_DESTROY_PENDING(TryReadGuard,
ThinClientRegion::localInvalidateFailover);
// No need to invalidate from the "m_xxxForUpdatesAsInvalidates" lists?
if (m_interestListRegex.empty() && m_durableInterestListRegex.empty()) {
invalidateInterestList(m_interestList);
invalidateInterestList(m_durableInterestList);
} else {
localInvalidateRegion_internal();
}
}
void ThinClientRegion::localInvalidateForRegisterInterest(
const VectorOfCacheableKey& keys) {
CHECK_DESTROY_PENDING(TryReadGuard,
ThinClientRegion::localInvalidateForRegisterInterest);
if (!m_regionAttributes->getCachingEnabled()) {
return;
}
CacheablePtr oldValue;
MapEntryImplPtr me;
for (VectorOfCacheableKey::Iterator iter = keys.begin(); iter != keys.end();
++iter) {
VersionTagPtr versionTag;
m_entries->invalidate(*iter, me, oldValue, versionTag);
// KN: New
updateAccessAndModifiedTimeForEntry(me, true);
}
}
InterestResultPolicy ThinClientRegion::copyInterestList(
VectorOfCacheableKey& keysVector,
std::unordered_map<CacheableKeyPtr, InterestResultPolicy>& interestList)
const {
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
for (std::unordered_map<CacheableKeyPtr, InterestResultPolicy>::const_iterator
iter = interestList.begin();
iter != interestList.end(); ++iter) {
keysVector.push_back(iter->first);
interestPolicy = iter->second;
}
return interestPolicy;
}
void ThinClientRegion::registerInterestGetValues(
const char* method, const VectorOfCacheableKey* keys,
const VectorOfCacheableKeyPtr& resultKeys) {
try {
HashMapOfExceptionPtr exceptions(new HashMapOfException());
GfErrType err = getAllNoThrow_remote(keys, NULLPTR, exceptions, resultKeys,
true, NULLPTR);
GfErrTypeToException(method, err);
// log any exceptions here
for (HashMapOfException::Iterator iter = exceptions->begin();
iter != exceptions->end(); ++iter) {
LOGWARN("%s Exception for key %s:: %s: %s", method,
Utils::getCacheableKeyString(iter.first())->asChar(),
iter.second()->getName(), iter.second()->getMessage());
}
} catch (const Exception& ex) {
LOGWARN("%s Exception while getting values: %s: %s", method, ex.getName(),
ex.getMessage());
std::string msg(method);
msg += " failed in getting values";
throw EntryNotFoundException(msg.c_str(), NULL, false,
ExceptionPtr(ex.clone()));
}
}
void ThinClientRegion::destroyDM(bool keepEndpoints) {
if (m_tcrdm != NULL) {
m_tcrdm->destroy(keepEndpoints);
}
}
void ThinClientRegion::release(bool invokeCallbacks) {
if (m_released) {
return;
}
if (!m_notifyRelease) {
m_notificationSema.acquire();
}
destroyDM(invokeCallbacks);
m_interestList.clear();
m_interestListRegex.clear();
m_durableInterestList.clear();
m_durableInterestListRegex.clear();
m_interestListForUpdatesAsInvalidates.clear();
m_interestListRegexForUpdatesAsInvalidates.clear();
m_durableInterestListForUpdatesAsInvalidates.clear();
m_durableInterestListRegexForUpdatesAsInvalidates.clear();
LocalRegion::release(invokeCallbacks);
}
ThinClientRegion::~ThinClientRegion() {
TryWriteGuard guard(m_rwLock, m_destroyPending);
if (!m_destroyPending) {
release(false);
}
GF_SAFE_DELETE(m_tcrdm);
}
void ThinClientRegion::acquireGlobals(bool isFailover) {
if (isFailover) {
m_tcrdm->acquireFailoverLock();
}
}
void ThinClientRegion::releaseGlobals(bool isFailover) {
if (isFailover) {
m_tcrdm->releaseFailoverLock();
}
}
// CacheableVectorPtr ThinClientRegion::executeFunction(const char* func,
// const CacheablePtr& args, CacheableVectorPtr routingObj,
// uint8_t getResult, ResultCollectorPtr rc, int32_t retryAttempts,
// uint32_t timeout) {
// int32_t attempt = 0;
// CacheableHashSetPtr failedNodes = NULLPTR;
//
// //CacheableStringArrayPtr csArray = poolDM->getServers();
//
// //if (csArray != NULLPTR && csArray->length() != 0) {
// // for (int i = 0; i < csArray->length(); i++) {
// // CacheableStringPtr cs = csArray[i];
// // TcrEndpoint *ep = NULL;
// // /*
// // std::string endpointStr =
// Utils::convertHostToCanonicalForm(
// // cs->asChar());
// // */
// // ep = poolDM->addEP(cs->asChar());
// // }
// //}
//
// //if pools retry attempts are not set then retry once on all available
// endpoints
// if (retryAttempts == -1) {
// retryAttempts = (int32_t) m_tcrdm->getNumberOfEndPoints();
// }
//
// while (attempt <= retryAttempts) {
// std::string funcName(func);
// TcrMessage msg(TcrMessage::EXECUTE_REGION_FUNCTION, funcName,
// m_fullPath, args, routingObj, getResult,
// failedNodes,
// m_tcrdm);
// TcrMessage reply(true, m_tcrdm);
// ChunkedFunctionExecutionResponsePtr
// resultCollector(new
// ChunkedFunctionExecutionResponse(reply,
// (getResult & 2), rc));
// reply.setChunkedResultHandler(resultCollector);
// reply.setTimeout(timeout);
//
// GfErrType err = GF_NOERR;
// err = m_tcrdm->sendSyncRequest(msg, reply, !(getResult & 1));
// if (err == GF_NOERR && (reply.getMessageType() ==
// TcrMessage::EXCEPTION
// || reply.getMessageType()
// ==
// TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
// err = ThinClientRegion::handleServerException("Execute",
// reply.getException());
// }
//
// if (ThinClientBaseDM::isFatalClientError(err)) {
// GfErrTypeToException("ExecuteOnRegion:", err);
// } else if (err != GF_NOERR) {
// if (getResult & 1) {
// resultCollector->reset();
// rc->clearResults();
// failedNodes = reply.getFailedNode();
// attempt++;
// continue;
// } else {
// GfErrTypeToException("ExecuteOnRegion:", err);
// }
// }
// CacheableVectorPtr values =
// resultCollector->getFunctionExecutionResults();
// return values;
// }
// return NULLPTR;
//}
void ThinClientRegion::executeFunction(const char* func,
const CacheablePtr& args,
CacheableVectorPtr routingObj,
uint8_t getResult, ResultCollectorPtr rc,
int32_t retryAttempts,
uint32_t timeout) {
int32_t attempt = 0;
CacheableHashSetPtr failedNodes = CacheableHashSet::create();
// if pools retry attempts are not set then retry once on all available
// endpoints
if (retryAttempts == -1) {
retryAttempts = static_cast<int32_t>(m_tcrdm->getNumberOfEndPoints());
}
bool reExecute = false;
bool reExecuteForServ = false;
do {
std::string funcName(func);
TcrMessage* msg;
if (reExecuteForServ) {
msg = new TcrMessageExecuteRegionFunction(
funcName, this, args, routingObj, getResult, failedNodes, timeout,
m_tcrdm, static_cast<int8_t>(1));
} else {
msg = new TcrMessageExecuteRegionFunction(
funcName, this, args, routingObj, getResult, failedNodes, timeout,
m_tcrdm, static_cast<int8_t>(0));
}
TcrMessageReply reply(true, m_tcrdm);
// need to check
ChunkedFunctionExecutionResponse* resultCollector(
new ChunkedFunctionExecutionResponse(reply, (getResult & 2), rc));
reply.setChunkedResultHandler(resultCollector);
reply.setTimeout(timeout);
GfErrType err = GF_NOERR;
err = m_tcrdm->sendSyncRequest(*msg, reply, !(getResult & 1));
resultCollector->reset();
delete msg;
delete resultCollector;
if (err == GF_NOERR &&
(reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
err = ThinClientRegion::handleServerException("Execute",
reply.getException());
}
if (ThinClientBaseDM::isFatalClientError(err)) {
GfErrTypeToException("ExecuteOnRegion:", err);
} else if (err != GF_NOERR) {
if (err == GF_FUNCTION_EXCEPTION) {
reExecute = true;
rc->clearResults();
CacheableHashSetPtr failedNodesIds(reply.getFailedNode());
failedNodes->clear();
if (failedNodesIds != NULLPTR && failedNodesIds->size() > 0) {
LOGDEBUG(
"ThinClientRegion::executeFunction with GF_FUNCTION_EXCEPTION "
"failedNodesIds size = %d ",
failedNodesIds->size());
for (CacheableHashSet::Iterator itr = failedNodesIds->begin();
itr != failedNodesIds->end(); ++itr) {
failedNodes->insert(*itr);
}
}
} else if (err == GF_NOTCON) {
attempt++;
LOGDEBUG(
"ThinClientRegion::executeFunction with GF_NOTCON retry attempt = "
"%d ",
attempt);
if (attempt > retryAttempts) {
GfErrTypeToException("ExecuteOnRegion:", err);
}
reExecuteForServ = true;
rc->clearResults();
failedNodes->clear();
} else if (err == GF_TIMOUT) {
LOGINFO(
"function timeout. Name: %s, timeout: %d, params: %d, "
"retryAttempts: %d ",
funcName.c_str(), timeout, getResult, retryAttempts);
GfErrTypeToException("ExecuteOnRegion", GF_TIMOUT);
} else if (err == GF_CLIENT_WAIT_TIMEOUT ||
err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
LOGINFO(
"function timeout, possibly bucket is not available or bucket "
"blacklisted. Name: %s, timeout: %d, params: %d, retryAttempts: "
"%d ",
funcName.c_str(), timeout, getResult, retryAttempts);
GfErrTypeToException("ExecuteOnRegion", GF_CLIENT_WAIT_TIMEOUT);
} else {
LOGDEBUG("executeFunction err = %d ", err);
GfErrTypeToException("ExecuteOnRegion:", err);
}
} else {
reExecute = false;
reExecuteForServ = false;
}
} while (reExecuteForServ);
if (reExecute && (getResult & 1)) {
reExecuteFunction(func, args, routingObj, getResult, rc, retryAttempts,
failedNodes, timeout);
}
}
CacheableVectorPtr ThinClientRegion::reExecuteFunction(
const char* func, const CacheablePtr& args, CacheableVectorPtr routingObj,
uint8_t getResult, ResultCollectorPtr rc, int32_t retryAttempts,
CacheableHashSetPtr& failedNodes, uint32_t timeout) {
int32_t attempt = 0;
bool reExecute = true;
// if pools retry attempts are not set then retry once on all available
// endpoints
if (retryAttempts == -1) {
retryAttempts = static_cast<int32_t>(m_tcrdm->getNumberOfEndPoints());
}
do {
reExecute = false;
std::string funcName(func);
TcrMessageExecuteRegionFunction msg(
funcName, this, args, routingObj, getResult, failedNodes, timeout,
m_tcrdm, /*reExecute*/ static_cast<int8_t>(1));
TcrMessageReply reply(true, m_tcrdm);
// need to check
ChunkedFunctionExecutionResponse* resultCollector(
new ChunkedFunctionExecutionResponse(reply, (getResult & 2), rc));
reply.setChunkedResultHandler(resultCollector);
reply.setTimeout(timeout);
GfErrType err = GF_NOERR;
err = m_tcrdm->sendSyncRequest(msg, reply, !(getResult & 1));
delete resultCollector;
if (err == GF_NOERR &&
(reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
err = ThinClientRegion::handleServerException("Execute",
reply.getException());
}
if (ThinClientBaseDM::isFatalClientError(err)) {
GfErrTypeToException("ExecuteOnRegion:", err);
} else if (err != GF_NOERR) {
if (err == GF_FUNCTION_EXCEPTION) {
reExecute = true;
rc->clearResults();
CacheableHashSetPtr failedNodesIds(reply.getFailedNode());
failedNodes->clear();
if (failedNodesIds != NULLPTR && failedNodesIds->size() > 0) {
LOGDEBUG(
"ThinClientRegion::reExecuteFunction with GF_FUNCTION_EXCEPTION "
"failedNodesIds size = %d ",
failedNodesIds->size());
for (CacheableHashSet::Iterator itr = failedNodesIds->begin();
itr != failedNodesIds->end(); ++itr) {
failedNodes->insert(*itr);
}
}
} else if (err == GF_NOTCON) {
attempt++;
LOGDEBUG(
"ThinClientRegion::reExecuteFunction with GF_NOTCON retry attempt "
"= %d ",
attempt);
if (attempt > retryAttempts) {
GfErrTypeToException("ExecuteOnRegion:", err);
}
reExecute = true;
rc->clearResults();
failedNodes->clear();
} else if (err == GF_TIMOUT) {
LOGINFO("function timeout");
GfErrTypeToException("ExecuteOnRegion", GF_CACHE_TIMEOUT_EXCEPTION);
} else {
LOGDEBUG("reExecuteFunction err = %d ", err);
GfErrTypeToException("ExecuteOnRegion:", err);
}
}
} while (reExecute);
return NULLPTR;
}
bool ThinClientRegion::executeFunctionSH(
const char* func, const CacheablePtr& args, uint8_t getResult,
ResultCollectorPtr rc,
HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* locationMap,
CacheableHashSetPtr& failedNodes, uint32_t timeout, bool allBuckets) {
bool reExecute = false;
ACE_Recursive_Thread_Mutex resultCollectorLock;
UserAttributesPtr userAttr =
TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes();
std::vector<OnRegionFunctionExecution*> feWorkers;
ThreadPool* threadPool = TPSingleton::instance();
for (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator
locationIter = locationMap->begin();
locationIter != locationMap->end(); ++locationIter) {
BucketServerLocationPtr serverLocation = locationIter.first();
CacheableHashSetPtr buckets = locationIter.second();
OnRegionFunctionExecution* worker = new OnRegionFunctionExecution(
func, this, args, buckets, getResult, timeout,
dynamic_cast<ThinClientPoolDM*>(m_tcrdm), &resultCollectorLock, rc,
userAttr, false, serverLocation, allBuckets);
threadPool->perform(worker);
feWorkers.push_back(worker);
}
for (std::vector<OnRegionFunctionExecution*>::iterator iter =
feWorkers.begin();
iter != feWorkers.end(); ++iter) {
OnRegionFunctionExecution* worker = *iter;
GfErrType err = worker->getResult();
TcrMessage* currentReply = worker->getReply();
if (err == GF_NOERR &&
(currentReply->getMessageType() == TcrMessage::EXCEPTION ||
currentReply->getMessageType() ==
TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) {
err = ThinClientRegion::handleServerException(
"Execute", currentReply->getException());
}
if (ThinClientBaseDM::isFatalClientError(err)) {
delete worker;
GfErrTypeToException("ExecuteOnRegion:", err);
} else if (err != GF_NOERR) {
if (err == GF_FUNCTION_EXCEPTION) {
reExecute = true;
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
if ((poolDM != NULL) && (poolDM->getClientMetaDataService() != NULL)) {
poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
this->getFullPath(), 0);
}
worker->getResultCollector()->reset();
{
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(resultCollectorLock);
rc->clearResults();
}
CacheableHashSetPtr failedNodeIds(currentReply->getFailedNode());
if (failedNodeIds != NULLPTR && failedNodeIds->size() > 0) {
LOGDEBUG(
"ThinClientRegion::executeFunctionSH with GF_FUNCTION_EXCEPTION "
"failedNodeIds size = %d ",
failedNodeIds->size());
for (CacheableHashSet::Iterator itr = failedNodeIds->begin();
itr != failedNodeIds->end(); ++itr) {
failedNodes->insert(*itr);
}
}
} else if (err == GF_NOTCON) {
reExecute = true;
LOGDEBUG("ThinClientRegion::executeFunctionSH with GF_NOTCON ");
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
if ((poolDM != NULL) && (poolDM->getClientMetaDataService() != NULL)) {
poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
this->getFullPath(), 0);
}
worker->getResultCollector()->reset();
{
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(resultCollectorLock);
rc->clearResults();
}
} else {
delete worker;
LOGDEBUG("executeFunctionSH err = %d ", err);
GfErrTypeToException("ExecuteOnRegion:", err);
}
}
delete worker;
}
return reExecute;
}
GfErrType ThinClientRegion::getFuncAttributes(const char* func,
std::vector<int8>** attr) {
GfErrType err = GF_NOERR;
// do TCR GET_FUNCTION_ATTRIBUTES
LOGDEBUG("Tcrmessage request GET_FUNCTION_ATTRIBUTES ");
std::string funcName(func);
TcrMessageGetFunctionAttributes request(funcName, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
return err;
}
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
*attr = reply.getFunctionAttributes();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::GET_FUNCTION_ATTRIBUTES",
reply.getException());
break;
}
default: {
LOGERROR("Unknown message type %d while getting function attributes.",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
}
GfErrType ThinClientRegion::getNoThrow_FullObject(EventIdPtr eventId,
CacheablePtr& fullObject,
VersionTagPtr& versionTag) {
TcrMessageRequestEventValue fullObjectMsg(eventId);
TcrMessageReply reply(true, NULL);
GfErrType err = GF_NOTCON;
err = m_tcrdm->sendSyncRequest(fullObjectMsg, reply, false, true);
if (err == GF_NOERR) {
fullObject = reply.getValue();
}
versionTag = reply.getVersionTag();
return err;
}
void ThinClientRegion::txDestroy(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
VersionTagPtr versionTag) {
GfErrType err = destroyNoThrowTX(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
GfErrTypeToException("Region::destroyTX", err);
}
void ThinClientRegion::txInvalidate(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
VersionTagPtr versionTag) {
GfErrType err = invalidateNoThrowTX(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
GfErrTypeToException("Region::invalidateTX", err);
}
void ThinClientRegion::txPut(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
int64 sampleStartNanos = Utils::startStatOpTime();
GfErrType err = putNoThrowTX(key, value, aCallbackArgument, oldValue, -1,
CacheEventFlags::NORMAL, versionTag);
Utils::updateStatOpTime(m_regionStats->getStat(),
RegionStatType::getInstance()->getPutTimeId(),
sampleStartNanos);
GfErrTypeToException("Region::putTX", err);
}
void ChunkedInterestResponse::reset() {
if (m_resultKeys != NULLPTR && m_resultKeys->size() > 0) {
m_resultKeys->clear();
}
}
void ChunkedInterestResponse::handleChunk(const uint8_t* chunk,
int32_t chunkLen,
uint8_t isLastChunkWithSecurity) {
DataInput input(chunk, chunkLen);
input.setPoolName(m_replyMsg.getPoolName());
uint32_t partLen;
if (TcrMessageHelper::readChunkPartHeader(
m_msg, input, 0, GeodeTypeIds::CacheableArrayList,
"ChunkedInterestResponse", partLen,
isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) {
// encountered an exception part, so return without reading more
m_replyMsg.readSecureObjectPart(input, false, true,
isLastChunkWithSecurity);
return;
}
if (m_resultKeys == NULLPTR) {
GF_NEW(m_resultKeys, VectorOfCacheableKey);
}
serializer::readObject(input, *m_resultKeys);
m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}
void ChunkedKeySetResponse::reset() {
if (m_resultKeys.size() > 0) {
m_resultKeys.clear();
}
}
void ChunkedKeySetResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
uint8_t isLastChunkWithSecurity) {
DataInput input(chunk, chunkLen);
input.setPoolName(m_replyMsg.getPoolName());
uint32_t partLen;
if (TcrMessageHelper::readChunkPartHeader(
m_msg, input, 0, GeodeTypeIds::CacheableArrayList,
"ChunkedKeySetResponse", partLen,
isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) {
// encountered an exception part, so return without reading more
m_replyMsg.readSecureObjectPart(input, false, true,
isLastChunkWithSecurity);
return;
}
serializer::readObject(input, m_resultKeys);
m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}
void ChunkedQueryResponse::reset() {
m_queryResults->clear();
m_structFieldNames.clear();
}
void ChunkedQueryResponse::readObjectPartList(DataInput& input,
bool isResultSet) {
bool hasKeys;
input.readBoolean(&hasKeys);
if (hasKeys) {
LOGERROR("Query response has keys which is unexpected.");
throw IllegalStateException("Query response has keys which is unexpected.");
}
int32_t len;
input.readInt(&len);
for (int32_t index = 0; index < len; ++index) {
uint8_t byte = 0;
input.read(&byte);
if (byte == 2 /* for exception*/) {
int32_t skipLen;
input.readArrayLen(&skipLen);
input.advanceCursor(skipLen);
CacheableStringPtr exMsgPtr;
input.readNativeString(exMsgPtr);
throw IllegalStateException(exMsgPtr->asChar());
} else {
if (isResultSet) {
CacheablePtr value;
input.readObject(value);
m_queryResults->push_back(value);
} else {
int8_t arrayType;
input.read(&arrayType);
if (arrayType == GeodeTypeIdsImpl::FixedIDByte) {
input.read(&arrayType);
if (arrayType != GeodeTypeIdsImpl::CacheableObjectPartList) {
LOGERROR(
"Query response got unhandled message format %d while "
"expecting struct set object part list; possible serialization "
"mismatch",
arrayType);
throw MessageException(
"Query response got unhandled message format while expecting "
"struct set object part list; possible serialization mismatch");
}
readObjectPartList(input, true);
} else {
LOGERROR(
"Query response got unhandled message format %d while expecting "
"struct set object part list; possible serialization mismatch",
arrayType);
throw MessageException(
"Query response got unhandled message format while expecting "
"struct set object part list; possible serialization mismatch");
}
}
}
}
}
void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
uint8_t isLastChunkWithSecurity) {
LOGDEBUG("ChunkedQueryResponse::handleChunk..");
DataInput input(chunk, chunkLen);
input.setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t isObj;
TcrMessageHelper::ChunkObjectType objType;
if ((objType = TcrMessageHelper::readChunkPartHeader(
m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
static_cast<uint8_t>(GeodeTypeIdsImpl::CollectionTypeImpl),
"ChunkedQueryResponse", partLen, isLastChunkWithSecurity)) ==
TcrMessageHelper::EXCEPTION) {
// encountered an exception part, so return without reading more
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
return;
} else if (objType == TcrMessageHelper::NULL_OBJECT) {
// special case for scalar result
input.readInt(&partLen);
input.read(&isObj);
CacheableInt32Ptr intVal;
input.readObject(intVal, true);
m_queryResults->push_back(intVal);
// TODO:
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
return;
}
uint8_t classByte;
char* isStructTypeImpl = NULL;
uint16_t stiLen = 0;
// soubhik: ignoring parent classes for now
// we will require to look at it once CQ is to be implemented.
// skipping HashSet/StructSet
// qhe: It was agreed upon that we'll use set for all kinds of results.
// to avoid dealing with compare operator for user objets.
// If the results on server are in a bag, or the user need to manipulate
// the elements, then we have to revisit this issue.
// For now, we'll live with duplicate records, hoping they do not cost much.
skipClass(input);
// skipping CollectionTypeImpl
// skipClass(input); // no longer, since GFE 5.7
int8_t structType;
input.read(&structType); // this is Fixed ID byte (1)
input.read(&structType); // this is DataSerializable (45)
input.read(&classByte);
uint8_t stringType;
input.read(&stringType); // ignore string header - assume 64k string
input.readUTF(&isStructTypeImpl, &stiLen);
DeleteArray<char> delSTI(isStructTypeImpl);
if (strcmp(isStructTypeImpl, "org.apache.geode.cache.query.Struct") == 0) {
int32_t numOfFldNames;
input.readArrayLen(&numOfFldNames);
bool skip = false;
if (m_structFieldNames.size() != 0) {
skip = true;
}
for (int i = 0; i < numOfFldNames; i++) {
CacheableStringPtr sptr;
// input.readObject(sptr);
input.readNativeString(sptr);
if (!skip) {
m_structFieldNames.push_back(sptr);
}
}
}
// skip the remaining part
input.reset();
// skip the whole part including partLen and isObj (4+1)
input.advanceCursor(partLen + 5);
input.readInt(&partLen);
input.read(&isObj);
if (!isObj) {
LOGERROR(
"Query response part is not an object; possible serialization "
"mismatch");
throw MessageException(
"Query response part is not an object; possible serialization "
"mismatch");
}
bool isResultSet = (m_structFieldNames.size() == 0);
int8_t arrayType;
input.read(&arrayType);
if (arrayType == GeodeTypeIds::CacheableObjectArray) {
int32_t arraySize;
input.readArrayLen(&arraySize);
skipClass(input);
for (int32_t arrayItem = 0; arrayItem < arraySize; ++arrayItem) {
SerializablePtr value;
if (isResultSet) {
input.readObject(value);
m_queryResults->push_back(value);
} else {
input.read(&isObj);
int32_t arraySize2;
input.readArrayLen(&arraySize2);
skipClass(input);
for (int32_t index = 0; index < arraySize2; ++index) {
input.readObject(value);
m_queryResults->push_back(value);
}
}
}
} else if (arrayType == GeodeTypeIdsImpl::FixedIDByte) {
input.read(&arrayType);
if (arrayType != GeodeTypeIdsImpl::CacheableObjectPartList) {
LOGERROR(
"Query response got unhandled message format %d while expecting "
"object part list; possible serialization mismatch",
arrayType);
throw MessageException(
"Query response got unhandled message format while expecting object "
"part list; possible serialization mismatch");
}
readObjectPartList(input, isResultSet);
} else {
LOGERROR(
"Query response got unhandled message format %d; possible "
"serialization mismatch",
arrayType);
throw MessageException(
"Query response got unhandled message format; possible serialization "
"mismatch");
}
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}
void ChunkedQueryResponse::skipClass(DataInput& input) {
uint8_t classByte;
input.read(&classByte);
if (classByte == GeodeTypeIdsImpl::Class) {
uint8_t stringType;
// ignore string type id - assuming its a normal (under 64k) string.
input.read(&stringType);
uint16_t classlen;
input.readInt(&classlen);
input.advanceCursor(classlen);
} else {
throw IllegalStateException(
"ChunkedQueryResponse::skipClass: "
"Did not get expected class header byte");
}
}
void ChunkedFunctionExecutionResponse::reset() {
// m_functionExecutionResults->clear();
}
void ChunkedFunctionExecutionResponse::handleChunk(
const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity) {
LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk");
DataInput input(chunk, chunkLen);
input.setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t arrayType;
if ((arrayType = static_cast<TcrMessageHelper::ChunkObjectType>(
TcrMessageHelper::readChunkPartHeader(
m_msg, input, "ChunkedFunctionExecutionResponse", partLen,
isLastChunkWithSecurity))) == TcrMessageHelper::EXCEPTION) {
// encountered an exception part, so return without reading more
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
return;
}
if (m_getResult == false) {
return;
}
if (static_cast<TcrMessageHelper::ChunkObjectType>(arrayType) ==
TcrMessageHelper::NULL_OBJECT) {
LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk NULL object");
// m_functionExecutionResults->push_back(NULLPTR);
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
return;
}
int32_t len;
int startLen =
input.getBytesRead() -
1; // from here need to look value part + memberid AND -1 for array type
input.readArrayLen(&len);
// read a byte to determine whether to read exception part for sendException
// or read objects.
uint8_t partType;
input.read(&partType);
bool isExceptionPart = false;
// See If partType is JavaSerializable
const int CHUNK_HDR_LEN = 5;
const int SECURE_PART_LEN = 5 + 8;
bool readPart = true;
LOGDEBUG(
"ChunkedFunctionExecutionResponse::handleChunk chunkLen = %d & partLen = "
"%d ",
chunkLen, partLen);
if (partType == GeodeTypeIdsImpl::JavaSerializable) {
isExceptionPart = true;
// reset the input.
input.reset();
if (((isLastChunkWithSecurity & 0x02) &&
(chunkLen - static_cast<int32_t>(partLen) <=
CHUNK_HDR_LEN + SECURE_PART_LEN)) ||
(((isLastChunkWithSecurity & 0x02) == 0) &&
(chunkLen - static_cast<int32_t>(partLen) <= CHUNK_HDR_LEN))) {
readPart = false;
input.readInt(&partLen);
input.advanceCursor(1); // skip isObject byte
input.advanceCursor(partLen);
} else {
// skip first part i.e JavaSerializable.
TcrMessageHelper::skipParts(m_msg, input, 1);
// read the second part which is string in usual manner, first its length.
input.readInt(&partLen);
int8_t isObject;
// then isObject byte
input.read(&isObject);
startLen = input.getBytesRead(); // reset from here need to look value
// part + memberid AND -1 for array type
// Since it is contained as a part of other results, read arrayType which
// is arrayList = 65.
input.read(&arrayType);
// then its len which is 2
input.readArrayLen(&len);
}
} else {
// rewind cursor by 1 to what we had read a byte to determine whether to
// read exception part or read objects.
input.rewindCursor(1);
}
// Read either object or exception string from sendException.
SerializablePtr value;
// CacheablePtr memberId;
if (readPart) {
input.readObject(value);
// TODO: track this memberId for PrFxHa
// input.readObject(memberId);
int objectlen = input.getBytesRead() - startLen;
int memberIdLen = partLen - objectlen;
input.advanceCursor(memberIdLen);
LOGDEBUG("function partlen = %d , objectlen = %d, memberidlen = %d ",
partLen, objectlen, memberIdLen);
LOGDEBUG("function input.getBytesRemaining() = %d ",
input.getBytesRemaining());
// is there any way to assert it, as after that we need to read security
// header
/*if(input.getBytesRemaining() != 0) {
LOGERROR("Function response not read all bytes");
throw IllegalStateException("Function Execution didn't read all bytes");
}*/
} else {
value = CacheableString::create("Function exception result.");
}
if (m_rc != NULLPTR) {
CacheablePtr result = NULLPTR;
if (isExceptionPart) {
UserFunctionExecutionExceptionPtr uFEPtr(
new UserFunctionExecutionException(value->toString()));
result = dynCast<CacheablePtr>(uFEPtr);
} else {
result = dynCast<CacheablePtr>(value);
}
if (m_resultCollectorLock != NULL) {
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(*m_resultCollectorLock);
m_rc->addResult(result);
} else {
m_rc->addResult(result);
}
}
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
// m_functionExecutionResults->push_back(value);
}
void ChunkedGetAllResponse::reset() {
m_keysOffset = 0;
if (m_resultKeys != NULLPTR && m_resultKeys->size() > 0) {
m_resultKeys->clear();
}
}
// process a GET_ALL response chunk
void ChunkedGetAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
uint8_t isLastChunkWithSecurity) {
DataInput input(chunk, chunkLen);
input.setPoolName(m_msg.getPoolName());
uint32_t partLen;
if (TcrMessageHelper::readChunkPartHeader(
m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
GeodeTypeIdsImpl::VersionedObjectPartList, "ChunkedGetAllResponse",
partLen, isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) {
// encountered an exception part, so return without reading more
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
return;
}
VersionedCacheableObjectPartList objectList(
m_keys, &m_keysOffset, m_values, m_exceptions, m_resultKeys, m_region,
&m_trackerMap, m_destroyTracker, m_addToLocalCache, m_dsmemId,
m_responseLock);
objectList.fromData(input);
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
}
void ChunkedGetAllResponse::add(const ChunkedGetAllResponse* other) {
if (m_values != NULLPTR) {
for (HashMapOfCacheable::Iterator iter = other->m_values->begin();
iter != other->m_values->end(); iter++) {
m_values->insert(iter.first(), iter.second());
}
}
if (m_exceptions != NULLPTR) {
for (HashMapOfException::Iterator iter = other->m_exceptions->begin();
iter != other->m_exceptions->end(); iter++) {
m_exceptions->insert(iter.first(), iter.second());
}
}
for (MapOfUpdateCounters::iterator iter = other->m_trackerMap.begin();
iter != other->m_trackerMap.end(); iter++) {
m_trackerMap[iter->first] = iter->second;
}
if (m_resultKeys != NULLPTR) {
for (VectorOfCacheableKey::Iterator iter = other->m_resultKeys->begin();
iter != other->m_resultKeys->end(); iter++) {
m_resultKeys->push_back(*iter);
}
}
}
void ChunkedPutAllResponse::reset() {
if (m_list != NULLPTR && m_list->size() > 0) {
m_list->getVersionedTagptr().clear();
}
}
// process a PUT_ALL response chunk
void ChunkedPutAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
uint8_t isLastChunkWithSecurity) {
DataInput input(chunk, chunkLen);
input.setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t chunkType;
if ((chunkType = (TcrMessageHelper::ChunkObjectType)
TcrMessageHelper::readChunkPartHeader(
m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
GeodeTypeIdsImpl::VersionedObjectPartList,
"ChunkedPutAllResponse", partLen, isLastChunkWithSecurity)) ==
TcrMessageHelper::NULL_OBJECT) {
LOGDEBUG("ChunkedPutAllResponse::handleChunk NULL object");
// No issues it will be empty in case of disabled caching.
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
return;
}
if (static_cast<TcrMessageHelper::ChunkObjectType>(chunkType) ==
TcrMessageHelper::OBJECT) {
LOGDEBUG("ChunkedPutAllResponse::handleChunk object");
ACE_Recursive_Thread_Mutex responseLock;
VersionedCacheableObjectPartListPtr vcObjPart(
new VersionedCacheableObjectPartList(
m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock));
vcObjPart->fromData(input);
m_list->addAll(vcObjPart);
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
} else {
LOGDEBUG("ChunkedPutAllResponse::handleChunk BYTES PART");
int8_t byte0;
input.read(&byte0);
LOGDEBUG("ChunkedPutAllResponse::handleChunk single-hop bytes byte0 = %d ",
byte0);
int8_t byte1;
input.read(&byte1);
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
PoolPtr pool = PoolManager::find(m_msg.getPoolName());
if (pool != NULLPTR && !pool->isDestroyed() &&
pool->getPRSingleHopEnabled()) {
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(pool.ptr());
if ((poolDM != NULL) && (poolDM->getClientMetaDataService() != NULL) &&
(byte0 != 0)) {
LOGFINE(
"enqueued region %s for metadata refresh for singlehop for PUTALL "
"operation.",
m_region->getFullPath());
poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
m_region->getFullPath(), byte1);
}
}
}
}
void ChunkedRemoveAllResponse::reset() {
if (m_list != NULLPTR && m_list->size() > 0) {
m_list->getVersionedTagptr().clear();
}
}
// process a REMOVE_ALL response chunk
void ChunkedRemoveAllResponse::handleChunk(const uint8_t* chunk,
int32_t chunkLen,
uint8_t isLastChunkWithSecurity) {
DataInput input(chunk, chunkLen);
input.setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t chunkType;
if ((chunkType = (TcrMessageHelper::ChunkObjectType)
TcrMessageHelper::readChunkPartHeader(
m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
GeodeTypeIdsImpl::VersionedObjectPartList,
"ChunkedRemoveAllResponse", partLen, isLastChunkWithSecurity)) ==
TcrMessageHelper::NULL_OBJECT) {
LOGDEBUG("ChunkedRemoveAllResponse::handleChunk NULL object");
// No issues it will be empty in case of disabled caching.
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
return;
}
if (static_cast<TcrMessageHelper::ChunkObjectType>(chunkType) ==
TcrMessageHelper::OBJECT) {
LOGDEBUG("ChunkedRemoveAllResponse::handleChunk object");
ACE_Recursive_Thread_Mutex responseLock;
VersionedCacheableObjectPartListPtr vcObjPart(
new VersionedCacheableObjectPartList(
m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock));
vcObjPart->fromData(input);
m_list->addAll(vcObjPart);
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
} else {
LOGDEBUG("ChunkedRemoveAllResponse::handleChunk BYTES PART");
int8_t byte0;
input.read(&byte0);
LOGDEBUG(
"ChunkedRemoveAllResponse::handleChunk single-hop bytes byte0 = %d ",
byte0);
int8_t byte1;
input.read(&byte1);
m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
PoolPtr pool = PoolManager::find(m_msg.getPoolName());
if (pool != NULLPTR && !pool->isDestroyed() &&
pool->getPRSingleHopEnabled()) {
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(pool.ptr());
if ((poolDM != NULL) && (poolDM->getClientMetaDataService() != NULL) &&
(byte0 != 0)) {
LOGFINE(
"enqueued region %s for metadata refresh for singlehop for "
"REMOVEALL operation.",
m_region->getFullPath());
poolDM->getClientMetaDataService()->enqueueForMetadataRefresh(
m_region->getFullPath(), byte1);
}
}
}
}
void ChunkedDurableCQListResponse::reset() {
if (m_resultList != NULLPTR && m_resultList->length() > 0) {
m_resultList->clear();
}
}
// handles the chunk response for GETDURABLECQS_MSG_TYPE
void ChunkedDurableCQListResponse::handleChunk(
const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity) {
DataInput input(chunk, chunkLen);
input.setPoolName(m_msg.getPoolName());
// read part length
uint32_t partLen;
input.readInt(&partLen);
bool isObj;
input.readBoolean(&isObj);
if (!isObj) {
// we're currently always expecting an object
char exMsg[256];
ACE_OS::snprintf(
exMsg, 255,
"ChunkedDurableCQListResponse::handleChunk: part is not object");
throw MessageException(exMsg);
}
input.advanceCursor(1); // skip the CacheableArrayList type ID byte
int8_t stringParts;
input.read(&stringParts); // read the number of strings in the message this
// is one byte
CacheableStringPtr strTemp;
for (int i = 0; i < stringParts; i++) {
input.readObject(strTemp);
m_resultList->push_back(strTemp);
}
}