blob: f12f137426b5cc5ed1d4e3182fec142f76d406f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ThinClientRegion.hpp"
#include <algorithm>
#include <limits>
#include <regex>
#include <geode/PoolManager.hpp>
#include <geode/Struct.hpp>
#include <geode/SystemProperties.hpp>
#include <geode/UserFunctionExecutionException.hpp>
#include "AutoDelete.hpp"
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "DataInputInternal.hpp"
#include "PutAllPartialResultServerException.hpp"
#include "ReadWriteLock.hpp"
#include "RegionGlobalLocks.hpp"
#include "RemoteQuery.hpp"
#include "TcrDistributionManager.hpp"
#include "TcrEndpoint.hpp"
#include "ThinClientBaseDM.hpp"
#include "ThinClientPoolDM.hpp"
#include "UserAttributes.hpp"
#include "Utils.hpp"
#include "VersionedCacheableObjectPartList.hpp"
#include "util/bounds.hpp"
#include "util/exception.hpp"
namespace apache {
namespace geode {
namespace client {
static const std::regex PREDICATE_IS_FULL_QUERY_REGEX(
"^\\s*(?:select|import)\\b", std::regex::icase);
void setThreadLocalExceptionMessage(const char* exMsg);
class PutAllWork : public PooledWork<GfErrType>,
private NonCopyable,
private NonAssignable {
ThinClientPoolDM* m_poolDM;
std::shared_ptr<BucketServerLocation> m_serverLocation;
TcrMessage* m_request;
TcrMessageReply* m_reply;
MapOfUpdateCounters m_mapOfUpdateCounters;
bool m_attemptFailover;
bool m_isBGThread;
std::shared_ptr<UserAttributes> m_userAttribute;
const std::shared_ptr<Region> m_region;
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> m_keys;
std::shared_ptr<HashMapOfCacheable> m_map;
std::shared_ptr<VersionedCacheableObjectPartList> m_verObjPartListPtr;
std::chrono::milliseconds m_timeout;
std::shared_ptr<PutAllPartialResultServerException> m_papException;
bool m_isPapeReceived;
ChunkedPutAllResponse* m_resultCollector;
// UNUSED const std::shared_ptr<Serializable>& m_aCallbackArgument;
public:
PutAllWork(
ThinClientPoolDM* poolDM,
const std::shared_ptr<BucketServerLocation>& serverLocation,
const std::shared_ptr<Region>& region, bool attemptFailover,
bool isBGThread, const std::shared_ptr<HashMapOfCacheable> map,
const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> keys,
std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument)
: m_poolDM(poolDM),
m_serverLocation(serverLocation),
m_attemptFailover(attemptFailover),
m_isBGThread(isBGThread),
m_userAttribute(nullptr),
m_region(region),
m_keys(keys),
m_map(map),
m_timeout(timeout),
m_papException(nullptr),
m_isPapeReceived(false)
// UNUSED , m_aCallbackArgument(aCallbackArgument)
{
m_request = new TcrMessagePutAll(
new DataOutput(m_region->getCache().createDataOutput()), m_region.get(),
*m_map, m_timeout, m_poolDM, aCallbackArgument);
m_reply = new TcrMessageReply(true, m_poolDM);
// create new instanceof VCOPL
std::recursive_mutex responseLock;
m_verObjPartListPtr = std::make_shared<VersionedCacheableObjectPartList>(
keys.get(), responseLock);
if (m_poolDM->isMultiUserMode()) {
m_userAttribute = UserAttributes::threadLocalUserAttributes;
}
m_request->setTimeout(m_timeout);
m_reply->setTimeout(m_timeout);
m_resultCollector = new ChunkedPutAllResponse(
m_region, *m_reply, responseLock, m_verObjPartListPtr);
m_reply->setChunkedResultHandler(m_resultCollector);
}
~PutAllWork() {
delete m_request;
delete m_reply;
delete m_resultCollector;
}
TcrMessage* getReply() { return m_reply; }
std::shared_ptr<HashMapOfCacheable> getPutAllMap() { return m_map; }
std::shared_ptr<VersionedCacheableObjectPartList> getVerObjPartList() {
return m_verObjPartListPtr;
}
ChunkedPutAllResponse* getResultCollector() { return m_resultCollector; }
std::shared_ptr<BucketServerLocation> getServerLocation() {
return m_serverLocation;
}
std::shared_ptr<PutAllPartialResultServerException> getPaPResultException() {
return m_papException;
}
void init() {}
GfErrType execute(void) {
GuardUserAttributes gua;
if (m_userAttribute != nullptr) {
gua.setAuthenticatedView(m_userAttribute->getAuthenticatedView());
}
GfErrType err = GF_NOERR;
err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover,
m_isBGThread, m_serverLocation);
// Set Version Tags
LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ",
m_resultCollector->getList()->size(), err);
m_verObjPartListPtr->setVersionedTagptr(
m_resultCollector->getList()->getVersionedTagptr());
if (err != GF_NOERR) {
return err;
} /*This can be GF_NOTCON, counterpart to java
ServerConnectivityException*/
switch (m_reply->getMessageType()) {
case TcrMessage::REPLY:
break;
case TcrMessage::RESPONSE:
LOGDEBUG("PutAllwork execute err = %d ", err);
break;
case TcrMessage::EXCEPTION:
// TODO::Check for the PAPException and READ
// PutAllPartialResultServerException and set its member for later use.
// set m_papException and m_isPapeReceived
m_isPapeReceived = true;
if (m_poolDM->isNotAuthorizedException(m_reply->getException())) {
LOGDEBUG("received NotAuthorizedException");
err = GF_AUTHENTICATION_FAILED_EXCEPTION;
} else if (m_poolDM->isPutAllPartialResultException(
m_reply->getException())) {
LOGDEBUG("received PutAllPartialResultException");
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
} else {
LOGDEBUG("received unknown exception:%s", m_reply->getException());
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
// TODO should assign a new err code
}
break;
case TcrMessage::PUT_DATA_ERROR:
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region put-all",
m_reply->getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
};
class RemoveAllWork : public PooledWork<GfErrType>,
private NonCopyable,
private NonAssignable {
ThinClientPoolDM* m_poolDM;
std::shared_ptr<BucketServerLocation> m_serverLocation;
TcrMessage* m_request;
TcrMessageReply* m_reply;
MapOfUpdateCounters m_mapOfUpdateCounters;
bool m_attemptFailover;
bool m_isBGThread;
std::shared_ptr<UserAttributes> m_userAttribute;
const std::shared_ptr<Region> m_region;
const std::shared_ptr<Serializable>& m_aCallbackArgument;
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> m_keys;
std::shared_ptr<VersionedCacheableObjectPartList> m_verObjPartListPtr;
std::shared_ptr<PutAllPartialResultServerException> m_papException;
bool m_isPapeReceived;
ChunkedRemoveAllResponse* m_resultCollector;
public:
RemoveAllWork(
ThinClientPoolDM* poolDM,
const std::shared_ptr<BucketServerLocation>& serverLocation,
const std::shared_ptr<Region>& region, bool attemptFailover,
bool isBGThread,
const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> keys,
const std::shared_ptr<Serializable>& aCallbackArgument)
: m_poolDM(poolDM),
m_serverLocation(serverLocation),
m_attemptFailover(attemptFailover),
m_isBGThread(isBGThread),
m_userAttribute(nullptr),
m_region(region),
m_aCallbackArgument(aCallbackArgument),
m_keys(keys),
m_papException(nullptr),
m_isPapeReceived(false) {
m_request = new TcrMessageRemoveAll(
new DataOutput(m_region->getCache().createDataOutput()), m_region.get(),
*keys, m_aCallbackArgument, m_poolDM);
m_reply = new TcrMessageReply(true, m_poolDM);
// create new instanceof VCOPL
std::recursive_mutex responseLock;
m_verObjPartListPtr = std::make_shared<VersionedCacheableObjectPartList>(
keys.get(), responseLock);
if (m_poolDM->isMultiUserMode()) {
m_userAttribute = UserAttributes::threadLocalUserAttributes;
}
m_resultCollector = new ChunkedRemoveAllResponse(
m_region, *m_reply, responseLock, m_verObjPartListPtr);
m_reply->setChunkedResultHandler(m_resultCollector);
}
~RemoveAllWork() {
delete m_request;
delete m_reply;
delete m_resultCollector;
}
TcrMessage* getReply() { return m_reply; }
std::shared_ptr<VersionedCacheableObjectPartList> getVerObjPartList() {
return m_verObjPartListPtr;
}
ChunkedRemoveAllResponse* getResultCollector() { return m_resultCollector; }
std::shared_ptr<BucketServerLocation> getServerLocation() {
return m_serverLocation;
}
std::shared_ptr<PutAllPartialResultServerException> getPaPResultException() {
return m_papException;
}
void init() {}
GfErrType execute(void) {
GuardUserAttributes gua;
if (m_userAttribute != nullptr) {
gua.setAuthenticatedView(m_userAttribute->getAuthenticatedView());
}
GfErrType err = GF_NOERR;
err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover,
m_isBGThread, m_serverLocation);
// Set Version Tags
LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ",
m_resultCollector->getList()->size(), err);
m_verObjPartListPtr->setVersionedTagptr(
m_resultCollector->getList()->getVersionedTagptr());
if (err != GF_NOERR) {
return err;
} /*This can be GF_NOTCON, counterpart to java
ServerConnectivityException*/
switch (m_reply->getMessageType()) {
case TcrMessage::REPLY:
break;
case TcrMessage::RESPONSE:
LOGDEBUG("RemoveAllWork execute err = %d ", err);
break;
case TcrMessage::EXCEPTION:
// TODO::Check for the PAPException and READ
// PutAllPartialResultServerException and set its member for later use.
// set m_papException and m_isPapeReceived
m_isPapeReceived = true;
if (m_poolDM->isNotAuthorizedException(m_reply->getException())) {
LOGDEBUG("received NotAuthorizedException");
err = GF_AUTHENTICATION_FAILED_EXCEPTION;
} else if (m_poolDM->isPutAllPartialResultException(
m_reply->getException())) {
LOGDEBUG("received PutAllPartialResultException");
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
} else {
LOGDEBUG("received unknown exception:%s", m_reply->getException());
err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
// TODO should assign a new err code
}
break;
case TcrMessage::PUT_DATA_ERROR:
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region remove-all",
m_reply->getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
};
ThinClientRegion::ThinClientRegion(
const std::string& name, CacheImpl* cacheImpl,
const std::shared_ptr<RegionInternal>& rPtr, RegionAttributes attributes,
const std::shared_ptr<CacheStatistics>& stats, bool shared)
: LocalRegion(name, cacheImpl, rPtr, attributes, stats, shared),
m_tcrdm(nullptr),
m_notifyRelease(false),
m_notificationSema(1),
m_isMetaDataRefreshed(false) {
m_transactionEnabled = true;
m_isDurableClnt = !cacheImpl->getDistributedSystem()
.getSystemProperties()
.durableClientId()
.empty();
}
void ThinClientRegion::initTCR() {
try {
m_tcrdm =
new TcrDistributionManager(this, m_cacheImpl->tcrConnectionManager());
m_tcrdm->init();
} catch (const Exception& ex) {
_GEODE_SAFE_DELETE(m_tcrdm);
LOGERROR("Exception while initializing region: %s: %s",
ex.getName().c_str(), ex.what());
throw;
}
}
void ThinClientRegion::registerKeys(
const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable,
bool getInitialValues, bool receiveValues) {
auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Registering keys is supported "
"only if subscription-enabled attribute is true for pool " +
pool->getName());
throw UnsupportedOperationException(
"Registering keys is supported "
"only if pool subscription-enabled attribute is true.");
}
}
if (keys.empty()) {
LOGERROR("Register keys list is empty");
throw IllegalArgumentException(
"Register keys "
"keys vector is empty");
}
if (isDurable && !isDurableClient()) {
LOGERROR(
"Register keys durable flag is only applicable for durable clients");
throw IllegalStateException(
"Durable flag only applicable for "
"durable clients");
}
if (getInitialValues && !m_regionAttributes.getCachingEnabled()) {
LOGERROR(
"Register keys getInitialValues flag is only applicable for caching"
"clients");
throw IllegalStateException(
"getInitialValues flag only applicable for caching clients");
}
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
if (getInitialValues) {
interestPolicy = InterestResultPolicy::KEYS_VALUES;
}
LOGDEBUG("ThinClientRegion::registerKeys : interestpolicy is %d",
interestPolicy.ordinal);
GfErrType err = registerKeysNoThrow(keys, true, nullptr, isDurable,
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
GfErrTypeToException("Region::registerKeys", err);
}
GfErrTypeToException("Region::registerKeys", err);
}
void ThinClientRegion::unregisterKeys(
const std::vector<std::shared_ptr<CacheableKey>>& keys) {
auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Unregister keys is supported "
"only if subscription-enabled attribute is true for pool " +
pool->getName());
throw UnsupportedOperationException(
"Unregister keys is supported "
"only if pool subscription-enabled attribute is true.");
}
} else {
if (!getAttributes().getClientNotificationEnabled()) {
LOGERROR(
"Unregister keys is supported "
"only if region client-notification attribute is true.");
throw UnsupportedOperationException(
"Unregister keys is supported "
"only if region client-notification attribute is true.");
}
}
if (keys.empty()) {
LOGERROR("Unregister keys list is empty");
throw IllegalArgumentException(
"Unregister keys "
"keys vector is empty");
}
GfErrType err = unregisterKeysNoThrow(keys);
GfErrTypeToException("Region::unregisterKeys", err);
}
void ThinClientRegion::registerAllKeys(bool isDurable, bool getInitialValues,
bool receiveValues) {
auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Register all keys is supported only "
"if subscription-enabled attribute is true for pool " +
pool->getName());
throw UnsupportedOperationException(
"Register all keys is supported only "
"if pool subscription-enabled attribute is true.");
}
}
if (isDurable && !isDurableClient()) {
LOGERROR(
"Register all keys durable flag is only applicable for durable "
"clients");
throw IllegalStateException(
"Durable flag only applicable for durable clients");
}
if (getInitialValues && !m_regionAttributes.getCachingEnabled()) {
LOGERROR(
"Register all keys getInitialValues flag is only applicable for caching"
"clients");
throw IllegalStateException(
"getInitialValues flag only applicable for caching clients");
}
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE;
if (getInitialValues) {
interestPolicy = InterestResultPolicy::KEYS_VALUES;
} else {
interestPolicy = InterestResultPolicy::KEYS;
}
LOGDEBUG("ThinClientRegion::registerAllKeys : interestpolicy is %d",
interestPolicy.ordinal);
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> resultKeys;
// if we need to fetch initial data, then we get the keys in
// that call itself using the special GET_ALL message and do not need
// to get the keys in the initial register interest call
GfErrType err =
registerRegexNoThrow(".*", true, nullptr, isDurable, resultKeys,
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
GfErrTypeToException("Region::registerAllKeys", err);
}
// Get the entries from the server using a special GET_ALL message
GfErrTypeToException("Region::registerAllKeys", err);
}
void ThinClientRegion::registerRegex(const std::string& regex, bool isDurable,
bool getInitialValues,
bool receiveValues) {
auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Register regex is supported only if "
"subscription-enabled attribute is true for pool " +
pool->getName());
throw UnsupportedOperationException(
"Register regex is supported only if "
"pool subscription-enabled attribute is true.");
}
}
if (isDurable && !isDurableClient()) {
LOGERROR("Register regex durable flag only applicable for durable clients");
throw IllegalStateException(
"Durable flag only applicable for durable clients");
}
if (regex.empty()) {
throw IllegalArgumentException(
"Region::registerRegex: Regex string is empty");
}
auto interestPolicy = InterestResultPolicy::NONE;
if (getInitialValues) {
interestPolicy = InterestResultPolicy::KEYS_VALUES;
} else {
interestPolicy = InterestResultPolicy::KEYS;
}
LOGDEBUG("ThinClientRegion::registerRegex : interestpolicy is %d",
interestPolicy.ordinal);
auto resultKeys2 =
std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
// if we need to fetch initial data for "allKeys" case, then we
// get the keys in that call itself using the special GET_ALL message and
// do not need to get the keys in the initial register interest call
GfErrType err =
registerRegexNoThrow(regex, true, nullptr, isDurable, resultKeys2,
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
GfErrTypeToException("Region::registerRegex", err);
}
GfErrTypeToException("Region::registerRegex", err);
}
void ThinClientRegion::unregisterRegex(const std::string& regex) {
auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Unregister regex is supported only if "
"subscription-enabled attribute is true for pool " +
pool->getName());
throw UnsupportedOperationException(
"Unregister regex is supported only if "
"pool subscription-enabled attribute is true.");
}
}
if (regex.empty()) {
LOGERROR("Unregister regex string is empty");
throw IllegalArgumentException("Unregister regex string is empty");
}
GfErrType err = unregisterRegexNoThrow(regex);
GfErrTypeToException("Region::unregisterRegex", err);
}
void ThinClientRegion::unregisterAllKeys() {
auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
"Unregister all keys is supported only if "
"subscription-enabled attribute is true for pool " +
pool->getName());
throw UnsupportedOperationException(
"Unregister all keys is supported only if "
"pool subscription-enabled attribute is true.");
}
}
GfErrType err = unregisterRegexNoThrow(".*");
GfErrTypeToException("Region::unregisterAllKeys", err);
}
std::shared_ptr<SelectResults> ThinClientRegion::query(
const std::string& predicate, std::chrono::milliseconds timeout) {
util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout);
CHECK_DESTROY_PENDING(TryReadGuard, Region::query);
if (predicate.empty()) {
LOGERROR("Region query predicate string is empty");
throw IllegalArgumentException("Region query predicate string is empty");
}
std::string squery;
if (std::regex_search(predicate, PREDICATE_IS_FULL_QUERY_REGEX)) {
squery = predicate;
} else {
squery = "select distinct * from ";
squery += getFullPath();
squery += " this where ";
squery += predicate;
}
std::shared_ptr<RemoteQuery> queryPtr;
// TODO:
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
if (poolDM) {
queryPtr = std::dynamic_pointer_cast<RemoteQuery>(
poolDM->getQueryServiceWithoutCheck()->newQuery(squery.c_str()));
} else {
queryPtr = std::dynamic_pointer_cast<RemoteQuery>(
m_cacheImpl->getQueryService()->newQuery(squery.c_str()));
}
return queryPtr->execute(timeout, "Region::query", m_tcrdm, nullptr);
}
bool ThinClientRegion::existsValue(const std::string& predicate,
std::chrono::milliseconds timeout) {
util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout);
auto results = query(predicate, timeout);
if (results == nullptr) {
return false;
}
return results->size() > 0;
}
GfErrType ThinClientRegion::unregisterKeysBeforeDestroyRegion() {
auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGDEBUG(
"pool subscription-enabled attribute is false, No need to Unregister "
"keys");
return GF_NOERR;
}
}
GfErrType err = GF_NOERR;
GfErrType opErr = GF_NOERR;
opErr = unregisterStoredRegexLocalDestroy(m_interestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegexLocalDestroy(m_durableInterestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegexLocalDestroy(
m_interestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegexLocalDestroy(
m_durableInterestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVec;
copyInterestList(keysVec, m_interestList);
opErr = unregisterKeysNoThrowLocalDestroy(keysVec, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVecDurable;
copyInterestList(keysVecDurable, m_durableInterestList);
opErr = unregisterKeysNoThrowLocalDestroy(keysVecDurable, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates;
copyInterestList(keysVecForUpdatesAsInvalidates,
m_interestListForUpdatesAsInvalidates);
opErr =
unregisterKeysNoThrowLocalDestroy(keysVecForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>>
keysVecDurableForUpdatesAsInvalidates;
copyInterestList(keysVecDurableForUpdatesAsInvalidates,
m_durableInterestListForUpdatesAsInvalidates);
opErr = unregisterKeysNoThrowLocalDestroy(
keysVecDurableForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
return err;
}
std::shared_ptr<Serializable> ThinClientRegion::selectValue(
const std::string& predicate, std::chrono::milliseconds timeout) {
auto results = query(predicate, timeout);
if (results == nullptr || results->size() == 0) {
return nullptr;
}
if (results->size() > 1) {
throw QueryException("selectValue has more than one result");
}
return results->operator[](0);
}
std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::serverKeys() {
CHECK_DESTROY_PENDING(TryReadGuard, Region::serverKeys);
TcrMessageReply reply(true, m_tcrdm);
TcrMessageKeySet request(new DataOutput(m_cacheImpl->createDataOutput()),
m_fullPath, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::KEY_SET);
std::vector<std::shared_ptr<CacheableKey>> serverKeys;
ChunkedKeySetResponse resultCollector(request, serverKeys, reply);
reply.setChunkedResultHandler(&resultCollector);
GfErrType err = GF_NOERR;
err = m_tcrdm->sendSyncRequest(request, reply);
GfErrTypeToException("Region::serverKeys", err);
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
// keyset result is handled by ChunkedKeySetResponse
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region:serverKeys", reply.getException());
break;
}
case TcrMessage::KEY_SET_DATA_ERROR: {
LOGERROR("Region serverKeys: an error occurred on the server");
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during region serverKeys",
reply.getMessageType());
err = GF_MSG;
break;
}
}
GfErrTypeToException("Region::serverKeys", err);
return serverKeys;
}
bool ThinClientRegion::containsKeyOnServer(
const std::shared_ptr<CacheableKey>& keyPtr) const {
GfErrType err = GF_NOERR;
bool ret = false;
/** @brief Create message and send to bridge server */
TcrMessageContainsKey request(
new DataOutput(m_cacheImpl->createDataOutput()), this, keyPtr,
static_cast<std::shared_ptr<Serializable>>(nullptr), true, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
err = m_tcrdm->sendSyncRequest(request, reply);
// if ( err != GF_NOERR ) return ret;
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE:
ret = reply.getBoolValue();
break;
case TcrMessage::EXCEPTION:
err = handleServerException("Region::containsKeyOnServer:",
reply.getException());
break;
case TcrMessage::REQUEST_DATA_ERROR:
LOGERROR(
"Region::containsKeyOnServer: read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type in Region::containsKeyOnServer %d",
reply.getMessageType());
err = GF_MSG;
break;
}
auto rptr = CacheableBoolean::create(ret);
rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr));
GfErrTypeToException("Region::containsKeyOnServer ", err);
return rptr->value();
}
bool ThinClientRegion::containsValueForKey_remote(
const std::shared_ptr<CacheableKey>& keyPtr) const {
GfErrType err = GF_NOERR;
bool ret = false;
/** @brief Create message and send to bridge server */
TcrMessageContainsKey request(
new DataOutput(m_cacheImpl->createDataOutput()), this, keyPtr,
static_cast<std::shared_ptr<Serializable>>(nullptr), false, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
err = m_tcrdm->sendSyncRequest(request, reply);
// if ( err != GF_NOERR ) return ret;
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE:
ret = reply.getBoolValue();
break;
case TcrMessage::EXCEPTION:
err = handleServerException("Region::containsValueForKey:",
reply.getException());
break;
case TcrMessage::REQUEST_DATA_ERROR:
LOGERROR(
"Region::containsValueForKey: read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type in Region::containsValueForKey %d",
reply.getMessageType());
err = GF_MSG;
break;
}
auto rptr = CacheableBoolean::create(ret);
rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr));
GfErrTypeToException("Region::containsValueForKey ", err);
return rptr->value();
}
void ThinClientRegion::clear(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err = GF_NOERR;
err = localClearNoThrow(aCallbackArgument, CacheEventFlags::NORMAL);
if (err != GF_NOERR) GfErrTypeToException("Region::clear", err);
/** @brief Create message and send to bridge server */
TcrMessageClearRegion request(new DataOutput(m_cacheImpl->createDataOutput()),
this, aCallbackArgument,
std::chrono::milliseconds(-1), m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) GfErrTypeToException("Region::clear", err);
switch (reply.getMessageType()) {
case TcrMessage::REPLY:
LOGFINE("Region %s clear message sent to server successfully",
m_fullPath.c_str());
break;
case TcrMessage::EXCEPTION:
err = handleServerException("Region::clear:", reply.getException());
break;
case TcrMessage::CLEAR_REGION_DATA_ERROR:
LOGERROR("Region clear read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region clear",
reply.getMessageType());
err = GF_MSG;
break;
}
if (err == GF_NOERR) {
err = invokeCacheListenerForRegionEvent(
aCallbackArgument, CacheEventFlags::NORMAL, AFTER_REGION_CLEAR);
}
GfErrTypeToException("Region::clear", err);
}
GfErrType ThinClientRegion::getNoThrow_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
std::shared_ptr<Cacheable>& valPtr,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
GfErrType err = GF_NOERR;
/** @brief Create message and send to bridge server */
TcrMessageRequest request(new DataOutput(m_cacheImpl->createDataOutput()),
this, keyPtr, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
// put the object into local region
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
valPtr = reply.getValue();
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::get", reply.getException());
break;
}
case TcrMessage::REQUEST_DATA_ERROR: {
// LOGERROR("A read error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while getting entry from region",
reply.getMessageType());
err = GF_MSG;
break;
}
}
return err;
}
GfErrType ThinClientRegion::invalidateNoThrow_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
GfErrType err = GF_NOERR;
/** @brief Create message and send to bridge server */
TcrMessageInvalidate request(new DataOutput(m_cacheImpl->createDataOutput()),
this, keyPtr, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
// put the object into local region
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::get", reply.getException());
break;
}
case TcrMessage::INVALIDATE_ERROR: {
// LOGERROR("A read error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while getting entry from region",
reply.getMessageType());
err = GF_MSG;
break;
}
}
return err;
}
GfErrType ThinClientRegion::putNoThrow_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Cacheable>& valuePtr,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag, bool checkDelta) {
GfErrType err = GF_NOERR;
// do TCR put
// bool delta = valuePtr->hasDelta();
bool delta = false;
auto&& conFlationValue = getCacheImpl()
->getDistributedSystem()
.getSystemProperties()
.conflateEvents();
if (checkDelta && valuePtr && conFlationValue != "true" &&
ThinClientBaseDM::isDeltaEnabledOnServer()) {
auto&& temp = std::dynamic_pointer_cast<Delta>(valuePtr);
delta = temp && temp->hasDelta();
}
TcrMessagePut request(new DataOutput(m_cacheImpl->createDataOutput()), this,
keyPtr, valuePtr, aCallbackArgument, delta, m_tcrdm);
TcrMessageReply* reply = new TcrMessageReply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, *reply);
if (delta) {
m_cacheImpl->getCachePerfStats()
.incDeltaPut(); // Does not chcek whether success of failure..
if (reply->getMessageType() ==
TcrMessage::PUT_DELTA_ERROR) { // Try without delta
TcrMessagePut request(new DataOutput(m_cacheImpl->createDataOutput()),
this, keyPtr, valuePtr, aCallbackArgument, false,
m_tcrdm, false, true);
delete reply;
reply = new TcrMessageReply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, *reply);
}
}
if (err != GF_NOERR) return err;
// put the object into local region
switch (reply->getMessageType()) {
case TcrMessage::REPLY: {
versionTag = reply->getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::put", reply->getException());
break;
}
case TcrMessage::PUT_DATA_ERROR: {
// LOGERROR("A write error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during region put reply",
reply->getMessageType());
err = GF_MSG;
}
}
delete reply;
reply = nullptr;
return err;
}
GfErrType ThinClientRegion::createNoThrow_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Cacheable>& valuePtr,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
return putNoThrow_remote(keyPtr, valuePtr, aCallbackArgument, versionTag,
false);
}
GfErrType ThinClientRegion::destroyNoThrow_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
GfErrType err = GF_NOERR;
// do TCR destroy
TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()),
this, keyPtr, nullptr, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
if (reply.getEntryNotFound() == 1) {
err = GF_CACHE_ENTRY_NOT_FOUND;
} else {
LOGDEBUG("Remote key [%s] is destroyed successfully in region %s",
Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str());
}
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::destroy", reply.getException());
break;
}
case TcrMessage::DESTROY_DATA_ERROR: {
err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while destroying region entry",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
} // destroyNoThrow_remote()
GfErrType ThinClientRegion::removeNoThrow_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Cacheable>& cvalue,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
GfErrType err = GF_NOERR;
// do TCR remove
TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()),
this, keyPtr, cvalue, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
return err;
}
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
if (reply.getEntryNotFound() == 1) {
err = GF_ENOENT;
} else {
LOGDEBUG("Remote key [%s] is removed successfully in region %s",
Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str());
}
versionTag = reply.getVersionTag();
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::remove", reply.getException());
break;
}
case TcrMessage::DESTROY_DATA_ERROR: {
err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while removing region entry",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
}
GfErrType ThinClientRegion::removeNoThrowEX_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
GfErrType err = GF_NOERR;
// do TCR remove
TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()),
this, keyPtr, nullptr, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
return err;
}
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
versionTag = reply.getVersionTag();
if (reply.getEntryNotFound() == 1) {
err = GF_ENOENT;
} else {
LOGDEBUG("Remote key [%s] is removed successfully in region %s",
Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str());
}
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region::removeEx", reply.getException());
break;
}
case TcrMessage::DESTROY_DATA_ERROR: {
err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d while removing region entry",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
}
GfErrType ThinClientRegion::getAllNoThrow_remote(
const std::vector<std::shared_ptr<CacheableKey>>* keys,
const std::shared_ptr<HashMapOfCacheable>& values,
const std::shared_ptr<HashMapOfException>& exceptions,
const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>&
resultKeys,
bool addToLocalCache,
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err = GF_NOERR;
MapOfUpdateCounters updateCountMap;
int32_t destroyTracker = 0;
addToLocalCache = addToLocalCache && m_regionAttributes.getCachingEnabled();
if (addToLocalCache && !m_regionAttributes.getConcurrencyChecksEnabled()) {
// start tracking the entries
if (keys == nullptr) {
// track all entries with destroy tracking for non-existent entries
destroyTracker = m_entries->addTrackerForAllEntries(updateCountMap, true);
} else {
for (const auto& key : *keys) {
std::shared_ptr<Cacheable> oldValue;
int updateCount =
m_entries->addTrackerForEntry(key, oldValue, true, false, false);
updateCountMap.emplace(key, updateCount);
}
}
}
// create the GET_ALL request
TcrMessageGetAll request(
new DataOutput(m_cacheImpl->createDataOutput()), this, keys, m_tcrdm,
aCallbackArgument); // now we need to initialize later
TcrMessageReply reply(true, m_tcrdm);
std::recursive_mutex responseLock;
// need to check
TcrChunkedResult* resultCollector(new ChunkedGetAllResponse(
reply, this, keys, values, exceptions, resultKeys, updateCountMap,
destroyTracker, addToLocalCache, responseLock));
reply.setChunkedResultHandler(resultCollector);
err = m_tcrdm->sendSyncRequest(request, reply);
if (addToLocalCache && !m_regionAttributes.getConcurrencyChecksEnabled()) {
// remove the tracking for remaining keys in case some keys do not have
// values from server in GII
for (MapOfUpdateCounters::const_iterator iter = updateCountMap.begin();
iter != updateCountMap.end(); ++iter) {
if (iter->second >= 0) {
m_entries->removeTrackerForEntry(iter->first);
}
}
// remove tracking for destroys
if (destroyTracker > 0) {
m_entries->removeDestroyTracking();
}
}
delete resultCollector;
if (err != GF_NOERR) {
return err;
}
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
// nothing to be done; put in local region, if required,
// is handled by ChunkedGetAllResponse
break;
}
case TcrMessage::EXCEPTION: {
err = handleServerException("Region:getAll", reply.getException());
break;
}
case TcrMessage::GET_ALL_DATA_ERROR: {
LOGERROR("Region get-all: a read error occurred on the endpoint %s",
m_tcrdm->getActiveEndpoint()->name().c_str());
err = GF_CACHESERVER_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during region get-all",
reply.getMessageType());
err = GF_MSG;
break;
}
}
return err;
}
GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote(
ThinClientPoolDM* tcrdm, const HashMapOfCacheable& map,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument) {
LOGDEBUG(" ThinClientRegion::singleHopPutAllNoThrow_remote map size = %d",
map.size());
auto region = shared_from_this();
auto error = GF_NOERR;
/*Step-1::
* populate the keys vector from the user Map and pass it to the
* getServerToFilterMap to generate locationMap
* If locationMap is nullptr try the old, existing putAll impl that may take
* multiple n/w hops
*/
auto userKeys = std::vector<std::shared_ptr<CacheableKey>>();
for (const auto& iter : map) {
userKeys.push_back(iter.first);
}
// last param in getServerToFilterMap() is false for putAll
// LOGDEBUG("ThinClientRegion::singleHopPutAllNoThrow_remote keys.size() = %d
// ", userKeys->size());
auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
userKeys, region, true);
if (!locationMap) {
// putAll with multiple hop implementation
LOGDEBUG("locationMap is Null or Empty");
return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
aCallbackArgument);
}
// set this flag that indicates putAll on PR is invoked with singlehop
// enabled.
m_isPRSingleHopEnabled = true;
// LOGDEBUG("locationMap.size() = %d ", locationMap->size());
/*Step-2
* a. create vector of PutAllWork
* b. locationMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
* server specific filteredMap/subMap by populating all keys
* (locationIter.second()) and its corr. values from the user Map.
* c. create new instance of PutAllWork, i.e worker with required params.
* //TODO:: Add details of each parameter later
* d. enqueue the worker for thread from threadPool to perform/run execute
* method.
* e. insert the worker into the vector.
*/
std::vector<PutAllWork*> putAllWorkers;
auto threadPool =
CacheRegionHelper::getCacheImpl(&getCache())->getThreadPool();
int locationMapIndex = 0;
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
if (serverLocation == nullptr) {
LOGDEBUG("serverLocation is nullptr");
}
const auto& keys = locationIter.second;
// Create server specific Sub-Map by iterating over keys.
auto filteredMap = std::make_shared<HashMapOfCacheable>();
if (keys != nullptr && keys->size() > 0) {
for (const auto& key : *keys) {
const auto& iter = map.find(key);
if (iter != map.end()) {
filteredMap->emplace(iter->first, iter->second);
}
}
}
auto worker = new PutAllWork(tcrdm, serverLocation, region,
true /*attemptFailover*/, false /*isBGThread*/,
filteredMap, keys, timeout, aCallbackArgument);
threadPool->perform(worker);
putAllWorkers.push_back(worker);
locationMapIndex++;
}
// TODO::CHECK, do we need to set following ..??
// reply.setMessageType(TcrMessage::RESPONSE);
int cnt = 1;
/**
* Step::3
* a. Iterate over all vector of putAllWorkers and populate worker specific
* information into the HashMap
* resultMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<Serializable>>, 2nd part, Value can be a
* std::shared_ptr<VersionedCacheableObjectPartList> or
* std::shared_ptr<PutAllPartialResultServerException>.
* failedServers<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
* the worker
*/
auto resultMap = ResultMap();
auto failedServers = FailedServersMap();
for (const auto& worker : putAllWorkers) {
auto err =
worker->getResult(); // wait() or blocking call for worker thread.
LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);
if (GF_NOERR == err) {
// No Exception from server
resultMap.emplace(worker->getServerLocation(),
worker->getResultCollector()->getList());
} else {
error = err;
if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
resultMap.emplace(worker->getServerLocation(),
worker->getPaPResultException());
} else if (error == GF_NOTCON) {
// Refresh the metadata in case of GF_NOTCON.
tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
region->getFullPath(), 0);
}
failedServers.emplace(worker->getServerLocation(),
CacheableInt32::create(error));
}
LOGDEBUG("worker->getPutAllMap()->size() = %d ",
worker->getPutAllMap()->size());
LOGDEBUG(
"worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
worker->getResultCollector()->getList()->getVersionedTagsize());
// TODO::CHECK, why do we need following code... ??
// TcrMessage* currentReply = worker->getReply();
/*
if(currentReply->getMessageType() != TcrMessage::REPLY)
{
reply.setMessageType(currentReply->getMessageType());
}
*/
delete worker;
cnt++;
}
/**
* Step:4
* a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
* map.size() b. Iterate over the resultMap and value for the particular
* serverlocation is of type VersionedCacheableObjectPartList add keys and
* versions. C. ToDO:: what if the value in the resultMap is of type
* PutAllPartialResultServerException
*/
std::recursive_mutex responseLock;
auto result = std::make_shared<PutAllPartialResult>(
static_cast<int>(map.size()), responseLock);
LOGDEBUG(
" TCRegion:: %s:%d "
"result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
__FILE__, __LINE__,
result->getSucceededKeysAndVersions()->getVersionedTagsize());
LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
resultMap.size());
for (const auto& resultMapIter : resultMap) {
const auto& value = resultMapIter.second;
if (const auto papException =
std::dynamic_pointer_cast<PutAllPartialResultServerException>(
value)) {
// PutAllPartialResultServerException CASE:: value in resultMap is of type
// PutAllPartialResultServerException.
// TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
// keys in map failedServers,
// that is set view of the keys contained in failedservers map.
// TODO:: need to read papException and populate PutAllPartialResult.
result->consolidate(papException->getResult());
} else if (const auto list =
std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
value)) {
// value in resultMap is of type VCOPL.
result->addKeysAndVersions(list);
} else {
// ERROR CASE
if (value) {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value "
"could not Cast to either VCOPL or "
"PutAllPartialResultServerException:%s",
value->toString().c_str());
} else {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value is "
"nullptr");
}
}
}
/**
* a. if PutAllPartialResult result does not contains any entry, Iterate over
* locationMap.
* b. Create std::vector<std::shared_ptr<CacheableKey>> succeedKeySet, and
* keep adding set of keys (locationIter.second()) in locationMap for which
* failedServers->contains(locationIter.first()is false.
*/
LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %d", __FILE__,
__LINE__, failedServers.size());
// if the partial result set doesn't already have keys (for tracking version
// tags)
// then we need to gather up the keys that we know have succeeded so far and
// add them to the partial result set (See bug Id #955)
if (!failedServers.empty()) {
auto succeedKeySet =
std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
if (result->getSucceededKeysAndVersions()->size() == 0) {
for (const auto& locationIter : *locationMap) {
if (failedServers.find(locationIter.first) != failedServers.end()) {
for (const auto& i : *(locationIter.second)) {
succeedKeySet->push_back(i);
}
}
}
result->addKeys(succeedKeySet);
}
}
/**
* a. Iterate over the failedServers map
* c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
* continue, Do not retry putAll for corr. keys.
* b. Retry for all the failed server.
* Generate a newSubMap by finding Keys specific to failedServers from
* locationMap and finding their respective values from the usermap.
*/
error = GF_NOERR;
bool oneSubMapRetryFailed = false;
for (const auto& failedServerIter : failedServers) {
if (failedServerIter.second->value() ==
GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation
// will not retry for PutAllPartialResultException
// but it means at least one sub map ever failed
oneSubMapRetryFailed = true;
error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
continue;
}
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
nullptr;
const auto& failedSerInLocMapIter =
locationMap->find(failedServerIter.first);
if (failedSerInLocMapIter != locationMap->end()) {
failedKeys = failedSerInLocMapIter->second;
}
if (failedKeys == nullptr) {
LOGERROR(
"TCRegion::singleHopPutAllNoThrow_remote :: failedKeys are nullptr "
"that is not valid");
}
auto newSubMap = std::make_shared<HashMapOfCacheable>();
if (failedKeys && !failedKeys->empty()) {
for (const auto& key : *failedKeys) {
const auto& iter = map.find(key);
if (iter != map.end()) {
newSubMap->emplace(iter->first, iter->second);
} else {
LOGERROR(
"DEBUG:: TCRegion.cpp singleHopPutAllNoThrow_remote KEY not "
"found in user failedSubMap");
}
}
}
std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
GfErrType errCode = multiHopPutAllNoThrow_remote(
*newSubMap, vcopListPtr, timeout, aCallbackArgument);
if (errCode == GF_NOERR) {
result->addKeysAndVersions(vcopListPtr);
} else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
oneSubMapRetryFailed = true;
// TODO:: Commented it as papResultServerExc is nullptr this time
// UnComment it once you read papResultServerExc.
// result->consolidate(papResultServerExc->getResult());
error = errCode;
} else /*if(errCode != GF_NOERR)*/ {
oneSubMapRetryFailed = true;
const auto& firstKey = newSubMap->begin()->first;
std::shared_ptr<Exception> excptPtr = nullptr;
// TODO:: formulat excptPtr from the errCode
result->saveFailedKey(firstKey, excptPtr);
error = errCode;
}
}
if (!oneSubMapRetryFailed) {
error = GF_NOERR;
}
versionedObjPartList = result->getSucceededKeysAndVersions();
LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
versionedObjPartList->size(), error);
return error;
}
GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote(
const HashMapOfCacheable& map,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument) {
// Multiple hop implementation
LOGDEBUG("ThinClientRegion::multiHopPutAllNoThrow_remote ");
auto err = GF_NOERR;
// Construct request/reply for putAll
TcrMessagePutAll request(new DataOutput(m_cacheImpl->createDataOutput()),
this, map, timeout, m_tcrdm, aCallbackArgument);
TcrMessageReply reply(true, m_tcrdm);
request.setTimeout(timeout);
reply.setTimeout(timeout);
std::recursive_mutex responseLock;
versionedObjPartList =
std::make_shared<VersionedCacheableObjectPartList>(this, responseLock);
// need to check
ChunkedPutAllResponse* resultCollector(new ChunkedPutAllResponse(
shared_from_this(), reply, responseLock, versionedObjPartList));
reply.setChunkedResultHandler(resultCollector);
err = m_tcrdm->sendSyncRequest(request, reply);
versionedObjPartList = resultCollector->getList();
LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d ",
versionedObjPartList->size(), err);
delete resultCollector;
if (err != GF_NOERR) return err;
LOGDEBUG(
"ThinClientRegion::multiHopPutAllNoThrow_remote reply.getMessageType() = "
"%d ",
reply.getMessageType());
switch (reply.getMessageType()) {
case TcrMessage::REPLY:
// LOGDEBUG("Map is written into remote server at region %s",
// m_fullPath.c_str());
break;
case TcrMessage::RESPONSE:
LOGDEBUG(
"multiHopPutAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ",
m_fullPath.c_str(), err);
break;
case TcrMessage::EXCEPTION:
err = handleServerException("ThinClientRegion::putAllNoThrow",
reply.getException());
// TODO:: Do we need to read PutAllPartialServerException for multiple
// hop.
break;
case TcrMessage::PUT_DATA_ERROR:
// LOGERROR( "A write error occurred on the endpoint %s",
// m_tcrdm->getActiveEndpoint( )->name( ).c_str( ) );
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region put-all",
reply.getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
GfErrType ThinClientRegion::putAllNoThrow_remote(
const HashMapOfCacheable& map,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument) {
LOGDEBUG("ThinClientRegion::putAllNoThrow_remote");
auto poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
auto txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
if (poolDM != nullptr) {
if (poolDM->getPRSingleHopEnabled() &&
poolDM->getClientMetaDataService() != nullptr &&
txState == nullptr /*For Tx use multi-hop*/) {
return singleHopPutAllNoThrow_remote(poolDM, map, versionedObjPartList,
timeout, aCallbackArgument);
} else {
return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout,
aCallbackArgument);
}
} else {
LOGERROR("ThinClientRegion::putAllNoThrow_remote :: Pool Not Specified ");
return GF_NOTSUP;
}
}
GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote(
ThinClientPoolDM* tcrdm,
const std::vector<std::shared_ptr<CacheableKey>>& keys,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
const std::shared_ptr<Serializable>& aCallbackArgument) {
LOGDEBUG(" ThinClientRegion::singleHopRemoveAllNoThrow_remote keys size = %d",
keys.size());
auto region = shared_from_this();
GfErrType error = GF_NOERR;
auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap(
keys, region, true);
if (!locationMap) {
// removeAll with multiple hop implementation
LOGDEBUG("locationMap is Null or Empty");
return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
aCallbackArgument);
}
// set this flag that indicates putAll on PR is invoked with singlehop
// enabled.
m_isPRSingleHopEnabled = true;
LOGDEBUG("locationMap.size() = %d ", locationMap->size());
/*Step-2
* a. create vector of RemoveAllWork
* b. locationMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create
* server specific filteredMap/subMap by populating all keys
* (locationIter.second()) and its corr. values from the user Map.
* c. create new instance of RemoveAllWork, i.e worker with required params.
* //TODO:: Add details of each parameter later
* d. enqueue the worker for thread from threadPool to perform/run execute
* method.
* e. insert the worker into the vector.
*/
std::vector<RemoveAllWork*> removeAllWorkers;
auto* threadPool =
CacheRegionHelper::getCacheImpl(&getCache())->getThreadPool();
int locationMapIndex = 0;
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
if (serverLocation == nullptr) {
LOGDEBUG("serverLocation is nullptr");
}
const auto& mappedkeys = locationIter.second;
auto worker = new RemoveAllWork(
tcrdm, serverLocation, region, true /*attemptFailover*/,
false /*isBGThread*/, mappedkeys, aCallbackArgument);
threadPool->perform(worker);
removeAllWorkers.push_back(worker);
locationMapIndex++;
}
// TODO::CHECK, do we need to set following ..??
// reply.setMessageType(TcrMessage::RESPONSE);
int cnt = 1;
/**
* Step::3
* a. Iterate over all vector of putAllWorkers and populate worker specific
* information into the HashMap
* resultMap<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<Serializable>>, 2nd part, Value can be a
* std::shared_ptr<VersionedCacheableObjectPartList> or
* std::shared_ptr<PutAllPartialResultServerException>.
* failedServers<std::shared_ptr<BucketServerLocation>,
* std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete
* the worker
*/
auto resultMap = ResultMap();
auto failedServers = FailedServersMap();
for (const auto& worker : removeAllWorkers) {
auto err =
worker->getResult(); // wait() or blocking call for worker thread.
LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err);
if (GF_NOERR == err) {
// No Exception from server
resultMap.emplace(worker->getServerLocation(),
worker->getResultCollector()->getList());
} else {
error = err;
if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
resultMap.emplace(worker->getServerLocation(),
worker->getPaPResultException());
} else if (error == GF_NOTCON) {
// Refresh the metadata in case of GF_NOTCON.
tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh(
region->getFullPath(), 0);
}
failedServers.emplace(worker->getServerLocation(),
CacheableInt32::create(error));
}
LOGDEBUG(
"worker->getResultCollector()->getList()->getVersionedTagsize() = %d ",
worker->getResultCollector()->getList()->getVersionedTagsize());
delete worker;
cnt++;
}
/**
* Step:4
* a. create instance of std::shared_ptr<PutAllPartialResult> with total size=
* map.size() b. Iterate over the resultMap and value for the particular
* serverlocation is of type VersionedCacheableObjectPartList add keys and
* versions. C. ToDO:: what if the value in the resultMap is of type
* PutAllPartialResultServerException
*/
std::recursive_mutex responseLock;
auto result = std::make_shared<PutAllPartialResult>(
static_cast<int>(keys.size()), responseLock);
LOGDEBUG(
" TCRegion:: %s:%d "
"result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ",
__FILE__, __LINE__,
result->getSucceededKeysAndVersions()->getVersionedTagsize());
LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__,
resultMap.size());
for (const auto& resultMapIter : resultMap) {
const auto& value = resultMapIter.second;
if (const auto papException =
std::dynamic_pointer_cast<PutAllPartialResultServerException>(
value)) {
// PutAllPartialResultServerException CASE:: value in resultMap is of type
// PutAllPartialResultServerException.
// TODO:: Add failedservers.keySet= all fialed servers, i.e list out all
// keys in map failedServers,
// that is set view of the keys contained in failedservers map.
// TODO:: need to read papException and populate PutAllPartialResult.
result->consolidate(papException->getResult());
} else if (const auto list =
std::dynamic_pointer_cast<VersionedCacheableObjectPartList>(
value)) {
// value in resultMap is of type VCOPL.
result->addKeysAndVersions(list);
} else {
// ERROR CASE
if (value) {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
"could not Cast to either VCOPL or "
"PutAllPartialResultServerException:%s",
value->toString().c_str());
} else {
LOGERROR(
"ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value "
"is nullptr");
}
}
}
/**
* a. if PutAllPartialResult result does not contains any entry, Iterate over
* locationMap.
* b. Create std::vector<std::shared_ptr<CacheableKey>> succeedKeySet, and
* keep adding set of keys (locationIter.second()) in locationMap for which
* failedServers->contains(locationIter.first()is false.
*/
LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %d", __FILE__,
__LINE__, failedServers.size());
// if the partial result set doesn't already have keys (for tracking version
// tags)
// then we need to gather up the keys that we know have succeeded so far and
// add them to the partial result set (See bug Id #955)
if (!failedServers.empty()) {
auto succeedKeySet =
std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
if (result->getSucceededKeysAndVersions()->size() == 0) {
for (const auto& locationIter : *locationMap) {
if (failedServers.find(locationIter.first) != failedServers.end()) {
for (const auto& i : *(locationIter.second)) {
succeedKeySet->push_back(i);
}
}
}
result->addKeys(succeedKeySet);
}
}
/**
* a. Iterate over the failedServers map
* c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then
* continue, Do not retry putAll for corr. keys.
* b. Retry for all the failed server.
* Generate a newSubMap by finding Keys specific to failedServers from
* locationMap and finding their respective values from the usermap.
*/
error = GF_NOERR;
bool oneSubMapRetryFailed = false;
for (const auto& failedServerIter : failedServers) {
if (failedServerIter.second->value() ==
GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation
// will not retry for PutAllPartialResultException
// but it means at least one sub map ever failed
oneSubMapRetryFailed = true;
error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION;
continue;
}
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys =
nullptr;
const auto& failedSerInLocMapIter =
locationMap->find(failedServerIter.first);
if (failedSerInLocMapIter != locationMap->end()) {
failedKeys = failedSerInLocMapIter->second;
}
if (failedKeys == nullptr) {
LOGERROR(
"TCRegion::singleHopRemoveAllNoThrow_remote :: failedKeys are "
"nullptr "
"that is not valid");
}
std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr;
std::shared_ptr<PutAllPartialResultServerException> papResultServerExc =
nullptr;
GfErrType errCode = multiHopRemoveAllNoThrow_remote(
*failedKeys, vcopListPtr, aCallbackArgument);
if (errCode == GF_NOERR) {
result->addKeysAndVersions(vcopListPtr);
} else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) {
oneSubMapRetryFailed = true;
error = errCode;
} else /*if(errCode != GF_NOERR)*/ {
oneSubMapRetryFailed = true;
std::shared_ptr<Exception> excptPtr = nullptr;
result->saveFailedKey(failedKeys->at(0), excptPtr);
error = errCode;
}
}
if (!oneSubMapRetryFailed) {
error = GF_NOERR;
}
versionedObjPartList = result->getSucceededKeysAndVersions();
LOGDEBUG("singlehop versionedObjPartList = %d error=%d",
versionedObjPartList->size(), error);
return error;
}
GfErrType ThinClientRegion::multiHopRemoveAllNoThrow_remote(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
const std::shared_ptr<Serializable>& aCallbackArgument) {
// Multiple hop implementation
LOGDEBUG("ThinClientRegion::multiHopRemoveAllNoThrow_remote ");
GfErrType err = GF_NOERR;
// Construct request/reply for putAll
TcrMessageRemoveAll request(new DataOutput(m_cacheImpl->createDataOutput()),
this, keys, aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
std::recursive_mutex responseLock;
versionedObjPartList =
std::make_shared<VersionedCacheableObjectPartList>(this, responseLock);
// need to check
ChunkedRemoveAllResponse* resultCollector(new ChunkedRemoveAllResponse(
shared_from_this(), reply, responseLock, versionedObjPartList));
reply.setChunkedResultHandler(resultCollector);
err = m_tcrdm->sendSyncRequest(request, reply);
versionedObjPartList = resultCollector->getList();
LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d ",
versionedObjPartList->size(), err);
delete resultCollector;
if (err != GF_NOERR) return err;
LOGDEBUG(
"ThinClientRegion::multiHopRemoveAllNoThrow_remote "
"reply.getMessageType() = %d ",
reply.getMessageType());
switch (reply.getMessageType()) {
case TcrMessage::REPLY:
// LOGDEBUG("Map is written into remote server at region %s",
// m_fullPath.c_str());
break;
case TcrMessage::RESPONSE:
LOGDEBUG(
"multiHopRemoveAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ",
m_fullPath.c_str(), err);
break;
case TcrMessage::EXCEPTION:
err = handleServerException("ThinClientRegion::putAllNoThrow",
reply.getException());
break;
default:
LOGERROR("Unknown message type %d during region remove-all",
reply.getMessageType());
err = GF_NOTOBJ;
break;
}
return err;
}
GfErrType ThinClientRegion::removeAllNoThrow_remote(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList,
const std::shared_ptr<Serializable>& aCallbackArgument) {
LOGDEBUG("ThinClientRegion::removeAllNoThrow_remote");
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(m_tcrdm);
TXState* txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
if (poolDM != nullptr) {
if (poolDM->getPRSingleHopEnabled() &&
poolDM->getClientMetaDataService() != nullptr &&
txState == nullptr /*For Tx use multi-hop*/) {
return singleHopRemoveAllNoThrow_remote(
poolDM, keys, versionedObjPartList, aCallbackArgument);
} else {
return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList,
aCallbackArgument);
}
} else {
LOGERROR(
"ThinClientRegion::removeAllNoThrow_remote :: Pool Not Specified ");
return GF_NOTSUP;
}
}
uint32_t ThinClientRegion::size_remote() {
LOGDEBUG("ThinClientRegion::size_remote");
GfErrType err = GF_NOERR;
// do TCR size
TcrMessageSize request(new DataOutput(m_cacheImpl->createDataOutput()),
m_fullPath.c_str());
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
GfErrTypeToException("Region::size", err);
}
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
auto size = std::dynamic_pointer_cast<CacheableInt32>(reply.getValue());
return size->value();
}
case TcrMessage::EXCEPTION:
err =
handleServerException("ThinClientRegion::size", reply.getException());
break;
case TcrMessage::SIZE_ERROR:
err = GF_CACHESERVER_EXCEPTION;
break;
default:
LOGERROR("Unknown message type %d during region size",
reply.getMessageType());
err = GF_NOTOBJ;
}
GfErrTypeToException("Region::size", err);
return 0;
}
GfErrType ThinClientRegion::registerStoredRegex(
TcrEndpoint* endpoint,
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex,
bool isDurable, bool receiveValues) {
GfErrType opErr = GF_NOERR;
GfErrType retVal = GF_NOERR;
for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
interestListRegex.begin();
it != interestListRegex.end(); ++it) {
opErr = registerRegexNoThrow(it->first, false, endpoint, isDurable, nullptr,
it->second, receiveValues);
if (opErr != GF_NOERR) {
retVal = opErr;
}
}
return retVal;
}
GfErrType ThinClientRegion::registerKeys(TcrEndpoint* endpoint,
const TcrMessage* request,
TcrMessageReply* reply) {
GfErrType err = GF_NOERR;
GfErrType opErr = GF_NOERR;
// called when failover to a different server
opErr = registerStoredRegex(endpoint, m_interestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = registerStoredRegex(
endpoint, m_interestListRegexForUpdatesAsInvalidates, false, false);
err = opErr != GF_NOERR ? opErr : err;
opErr = registerStoredRegex(endpoint, m_durableInterestListRegex, true);
err = opErr != GF_NOERR ? opErr : err;
opErr = registerStoredRegex(
endpoint, m_durableInterestListRegexForUpdatesAsInvalidates, true, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVec;
InterestResultPolicy interestPolicy =
copyInterestList(keysVec, m_interestList);
opErr = registerKeysNoThrow(keysVec, false, endpoint, false, interestPolicy);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates;
interestPolicy = copyInterestList(keysVecForUpdatesAsInvalidates,
m_interestListForUpdatesAsInvalidates);
opErr = registerKeysNoThrow(keysVecForUpdatesAsInvalidates, false, endpoint,
false, interestPolicy, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVecDurable;
interestPolicy = copyInterestList(keysVecDurable, m_durableInterestList);
opErr = registerKeysNoThrow(keysVecDurable, false, endpoint, true,
interestPolicy);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>>
keysVecDurableForUpdatesAsInvalidates;
interestPolicy =
copyInterestList(keysVecDurableForUpdatesAsInvalidates,
m_durableInterestListForUpdatesAsInvalidates);
opErr = registerKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false,
endpoint, true, interestPolicy, false);
err = opErr != GF_NOERR ? opErr : err;
if (request != nullptr && request->getRegionName() == m_fullPath &&
(request->getMessageType() == TcrMessage::REGISTER_INTEREST ||
request->getMessageType() == TcrMessage::REGISTER_INTEREST_LIST)) {
const std::vector<std::shared_ptr<CacheableKey>>* newKeysVec =
request->getKeys();
bool isDurable = request->isDurable();
bool receiveValues = request->receiveValues();
if (newKeysVec == nullptr || newKeysVec->empty()) {
const std::string& newRegex = request->getRegex();
if (!newRegex.empty()) {
if (request->getRegionName() != m_fullPath) {
reply = nullptr;
}
opErr = registerRegexNoThrow(
newRegex, false, endpoint, isDurable, nullptr,
request->getInterestResultPolicy(), receiveValues, reply);
err = opErr != GF_NOERR ? opErr : err;
}
} else {
opErr = registerKeysNoThrow(*newKeysVec, false, endpoint, isDurable,
request->getInterestResultPolicy(),
receiveValues, reply);
err = opErr != GF_NOERR ? opErr : err;
}
}
return err;
}
GfErrType ThinClientRegion::unregisterStoredRegex(
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) {
GfErrType opErr = GF_NOERR;
GfErrType retVal = GF_NOERR;
for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
interestListRegex.begin();
it != interestListRegex.end(); ++it) {
opErr = unregisterRegexNoThrow(it->first, false);
if (opErr != GF_NOERR) {
retVal = opErr;
}
}
return retVal;
}
GfErrType ThinClientRegion::unregisterStoredRegexLocalDestroy(
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) {
GfErrType opErr = GF_NOERR;
GfErrType retVal = GF_NOERR;
for (std::unordered_map<std::string, InterestResultPolicy>::iterator it =
interestListRegex.begin();
it != interestListRegex.end(); ++it) {
opErr = unregisterRegexNoThrowLocalDestroy(it->first, false);
if (opErr != GF_NOERR) {
retVal = opErr;
}
}
return retVal;
}
GfErrType ThinClientRegion::unregisterKeys() {
GfErrType err = GF_NOERR;
GfErrType opErr = GF_NOERR;
// called when disconnect from a server
opErr = unregisterStoredRegex(m_interestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegex(m_durableInterestListRegex);
err = opErr != GF_NOERR ? opErr : err;
opErr = unregisterStoredRegex(m_interestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
opErr =
unregisterStoredRegex(m_durableInterestListRegexForUpdatesAsInvalidates);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVec;
copyInterestList(keysVec, m_interestList);
opErr = unregisterKeysNoThrow(keysVec, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVecDurable;
copyInterestList(keysVecDurable, m_durableInterestList);
opErr = unregisterKeysNoThrow(keysVecDurable, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates;
copyInterestList(keysVecForUpdatesAsInvalidates,
m_interestListForUpdatesAsInvalidates);
opErr = unregisterKeysNoThrow(keysVecForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
std::vector<std::shared_ptr<CacheableKey>>
keysVecDurableForUpdatesAsInvalidates;
copyInterestList(keysVecDurableForUpdatesAsInvalidates,
m_durableInterestListForUpdatesAsInvalidates);
opErr = unregisterKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false);
err = opErr != GF_NOERR ? opErr : err;
return err;
}
GfErrType ThinClientRegion::destroyRegionNoThrow_remote(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err = GF_NOERR;
// do TCR destroyRegion
TcrMessageDestroyRegion request(
new DataOutput(m_cacheImpl->createDataOutput()), this, aCallbackArgument,
std::chrono::milliseconds(-1), m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
switch (reply.getMessageType()) {
case TcrMessage::REPLY: {
// LOGINFO("Region %s at remote is destroyed successfully",
// m_fullPath.c_str());
break;
}
case TcrMessage::EXCEPTION: {
err =
handleServerException("Region::destroyRegion", reply.getException());
break;
}
case TcrMessage::DESTROY_REGION_DATA_ERROR: {
err = GF_CACHE_REGION_DESTROYED_EXCEPTION;
break;
}
default: {
LOGERROR("Unknown message type %d during destroy region",
reply.getMessageType());
err = GF_MSG;
}
}
return err;
}
GfErrType ThinClientRegion::registerKeysNoThrow(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
bool attemptFailover, TcrEndpoint* endpoint, bool isDurable,
InterestResultPolicy interestPolicy, bool receiveValues,
TcrMessageReply* reply) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
if (keys.empty()) {
return err;
}
TcrMessageReply replyLocal(true, m_tcrdm);
bool needToCreateRC = true;
if (reply == nullptr) {
reply = &replyLocal;
} else {
needToCreateRC = false;
}
LOGDEBUG("ThinClientRegion::registerKeysNoThrow : interestpolicy is %d",
interestPolicy.ordinal);
TcrMessageRegisterInterestList request(
new DataOutput(m_cacheImpl->createDataOutput()), this, keys, isDurable,
getAttributes().getCachingEnabled(), receiveValues, interestPolicy,
m_tcrdm);
std::recursive_mutex responseLock;
TcrChunkedResult* resultCollector = nullptr;
if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) {
auto values = std::make_shared<HashMapOfCacheable>();
auto exceptions = std::make_shared<HashMapOfException>();
MapOfUpdateCounters trackers;
int32_t destroyTracker = 1;
if (needToCreateRC) {
resultCollector = (new ChunkedGetAllResponse(
request, this, &keys, values, exceptions, nullptr, trackers,
destroyTracker, true, responseLock));
reply->setChunkedResultHandler(resultCollector);
}
} else {
if (needToCreateRC) {
resultCollector = (new ChunkedInterestResponse(request, nullptr, *reply));
reply->setChunkedResultHandler(resultCollector);
}
}
err = m_tcrdm->sendSyncRequestRegisterInterest(
request, *reply, attemptFailover, this, endpoint);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY &&
endpoint) {
LOGFINER(
"registerKeysNoThrow - got response from secondary for "
"endpoint %s, ignoring.",
endpoint->name().c_str());
} else if (attemptFailover) {
addKeys(keys, isDurable, receiveValues, interestPolicy);
if (!(interestPolicy.ordinal ==
InterestResultPolicy::KEYS_VALUES.ordinal)) {
localInvalidateForRegisterInterest(keys);
}
}
}
if (needToCreateRC) {
delete resultCollector;
}
return err;
}
GfErrType ThinClientRegion::unregisterKeysNoThrow(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
bool attemptFailover) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
TcrMessageReply reply(true, m_tcrdm);
if (keys.empty()) {
return err;
}
if (m_interestList.empty() && m_durableInterestList.empty() &&
m_interestListForUpdatesAsInvalidates.empty() &&
m_durableInterestListForUpdatesAsInvalidates.empty()) {
// did not register any keys before.
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
TcrMessageUnregisterInterestList request(
new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true,
InterestResultPolicy::NONE, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (attemptFailover) {
for (const auto& key : keys) {
m_interestList.erase(key);
m_durableInterestList.erase(key);
m_interestListForUpdatesAsInvalidates.erase(key);
m_durableInterestListForUpdatesAsInvalidates.erase(key);
}
}
}
return err;
}
GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
bool attemptFailover) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
GfErrType err = GF_NOERR;
std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
TcrMessageReply reply(true, m_tcrdm);
if (keys.empty()) {
return err;
}
if (m_interestList.empty() && m_durableInterestList.empty() &&
m_interestListForUpdatesAsInvalidates.empty() &&
m_durableInterestListForUpdatesAsInvalidates.empty()) {
// did not register any keys before.
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
TcrMessageUnregisterInterestList request(
new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true,
InterestResultPolicy::NONE, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR) {
if (attemptFailover) {
for (const auto& key : keys) {
m_interestList.erase(key);
m_durableInterestList.erase(key);
m_interestListForUpdatesAsInvalidates.erase(key);
m_durableInterestListForUpdatesAsInvalidates.erase(key);
}
}
}
return err;
}
bool ThinClientRegion::isRegexRegistered(
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex,
const std::string& regex, bool allKeys) {
if (interestListRegex.find(".*") != interestListRegex.end() ||
(!allKeys && interestListRegex.find(regex) != interestListRegex.end())) {
return true;
}
return false;
}
GfErrType ThinClientRegion::registerRegexNoThrow(
const std::string& regex, bool attemptFailover, TcrEndpoint* endpoint,
bool isDurable,
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> resultKeys,
InterestResultPolicy interestPolicy, bool receiveValues,
TcrMessageReply* reply) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
bool allKeys = (regex == ".*");
std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
if (attemptFailover) {
if ((isDurable &&
(isRegexRegistered(m_durableInterestListRegex, regex, allKeys) ||
isRegexRegistered(m_durableInterestListRegexForUpdatesAsInvalidates,
regex, allKeys))) ||
(!isDurable &&
(isRegexRegistered(m_interestListRegex, regex, allKeys) ||
isRegexRegistered(m_interestListRegexForUpdatesAsInvalidates, regex,
allKeys)))) {
return err;
}
}
ChunkedInterestResponse* resultCollector = nullptr;
ChunkedGetAllResponse* getAllResultCollector = nullptr;
if (reply != nullptr) {
// need to check
resultCollector = dynamic_cast<ChunkedInterestResponse*>(
reply->getChunkedResultHandler());
if (resultCollector != nullptr) {
resultKeys = resultCollector->getResultKeys();
} else {
getAllResultCollector = dynamic_cast<ChunkedGetAllResponse*>(
reply->getChunkedResultHandler());
resultKeys = getAllResultCollector->getResultKeys();
}
}
bool isRCCreatedLocally = false;
LOGDEBUG("ThinClientRegion::registerRegexNoThrow : interestpolicy is %d",
interestPolicy.ordinal);
// TODO:
TcrMessageRegisterInterest request(
new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath,
regex.c_str(), interestPolicy, isDurable,
getAttributes().getCachingEnabled(), receiveValues, m_tcrdm);
std::recursive_mutex responseLock;
if (reply == nullptr) {
TcrMessageReply replyLocal(true, m_tcrdm);
auto values = std::make_shared<HashMapOfCacheable>();
auto exceptions = std::make_shared<HashMapOfException>();
reply = &replyLocal;
if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) {
MapOfUpdateCounters trackers;
int32_t destroyTracker = 1;
if (resultKeys == nullptr) {
resultKeys =
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>(
new std::vector<std::shared_ptr<CacheableKey>>());
}
// need to check
getAllResultCollector = (new ChunkedGetAllResponse(
request, this, nullptr, values, exceptions, resultKeys, trackers,
destroyTracker, true, responseLock));
reply->setChunkedResultHandler(getAllResultCollector);
isRCCreatedLocally = true;
} else {
isRCCreatedLocally = true;
// need to check
resultCollector =
new ChunkedInterestResponse(request, resultKeys, replyLocal);
reply->setChunkedResultHandler(resultCollector);
}
err = m_tcrdm->sendSyncRequestRegisterInterest(
request, replyLocal, attemptFailover, this, endpoint);
} else {
err = m_tcrdm->sendSyncRequestRegisterInterest(
request, *reply, attemptFailover, this, endpoint);
}
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY &&
endpoint) {
LOGFINER(
"registerRegexNoThrow - got response from secondary for "
"endpoint %s, ignoring.",
endpoint->name().c_str());
} else if (attemptFailover) {
addRegex(regex, isDurable, receiveValues, interestPolicy);
if (interestPolicy.ordinal != InterestResultPolicy::KEYS_VALUES.ordinal) {
if (allKeys) {
localInvalidateRegion_internal();
} else {
const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>&
keys = resultCollector != nullptr
? resultCollector->getResultKeys()
: getAllResultCollector->getResultKeys();
if (keys != nullptr) {
localInvalidateForRegisterInterest(*keys);
}
}
}
}
}
if (isRCCreatedLocally == true) {
if (resultCollector != nullptr) delete resultCollector;
if (getAllResultCollector != nullptr) delete getAllResultCollector;
}
return err;
}
GfErrType ThinClientRegion::unregisterRegexNoThrow(const std::string& regex,
bool attemptFailover) {
RegionGlobalLocks acquireLocksRedundancy(this, false);
RegionGlobalLocks acquireLocksFailover(this);
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
err = findRegex(regex);
if (err == GF_NOERR) {
TcrMessageReply reply(false, m_tcrdm);
TcrMessageUnregisterInterest request(
new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex,
InterestResultPolicy::NONE, false, true, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (attemptFailover) {
clearRegex(regex);
}
}
}
return err;
}
GfErrType ThinClientRegion::findRegex(const std::string& regex) {
GfErrType err = GF_NOERR;
std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
if (m_interestListRegex.find(regex) == m_interestListRegex.end() &&
m_durableInterestListRegex.find(regex) ==
m_durableInterestListRegex.end() &&
m_interestListRegexForUpdatesAsInvalidates.find(regex) ==
m_interestListRegexForUpdatesAsInvalidates.end() &&
m_durableInterestListRegexForUpdatesAsInvalidates.find(regex) ==
m_durableInterestListRegexForUpdatesAsInvalidates.end()) {
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
} else {
return err;
}
}
void ThinClientRegion::clearRegex(const std::string& regex) {
std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock);
m_interestListRegex.erase(regex);
m_durableInterestListRegex.erase(regex);
m_interestListRegexForUpdatesAsInvalidates.erase(regex);
m_durableInterestListRegexForUpdatesAsInvalidates.erase(regex);
}
GfErrType ThinClientRegion::unregisterRegexNoThrowLocalDestroy(
const std::string& regex, bool attemptFailover) {
GfErrType err = GF_NOERR;
err = findRegex(regex);
if (err == GF_NOERR) {
TcrMessageReply reply(false, m_tcrdm);
TcrMessageUnregisterInterest request(
new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex,
InterestResultPolicy::NONE, false, true, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR) {
if (attemptFailover) {
clearRegex(regex);
}
}
}
return err;
}
void ThinClientRegion::addKeys(
const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable,
bool receiveValues, InterestResultPolicy interestpolicy) {
std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>&
interestList =
isDurable
? (receiveValues ? m_durableInterestList
: m_durableInterestListForUpdatesAsInvalidates)
: (receiveValues ? m_interestList
: m_interestListForUpdatesAsInvalidates);
for (const auto& key : keys) {
interestList.insert(
std::pair<std::shared_ptr<CacheableKey>, InterestResultPolicy>(
key, interestpolicy));
}
}
void ThinClientRegion::addRegex(const std::string& regex, bool isDurable,
bool receiveValues,
InterestResultPolicy interestpolicy) {
std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>&
interestList =
isDurable
? (receiveValues ? m_durableInterestList
: m_durableInterestListForUpdatesAsInvalidates)
: (receiveValues ? m_interestList
: m_interestListForUpdatesAsInvalidates);
std::unordered_map<std::string, InterestResultPolicy>& interestListRegex =
isDurable
? (receiveValues ? m_durableInterestListRegex
: m_durableInterestListRegexForUpdatesAsInvalidates)
: (receiveValues ? m_interestListRegex
: m_interestListRegexForUpdatesAsInvalidates);
if (regex == ".*") {
interestListRegex.clear();
interestList.clear();
}
interestListRegex.insert(
std::pair<std::string, InterestResultPolicy>(regex, interestpolicy));
}
std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::getInterestList()
const {
auto nthis = const_cast<ThinClientRegion*>(this);
RegionGlobalLocks acquireLocksRedundancy(nthis, false);
RegionGlobalLocks acquireLocksFailover(nthis);
CHECK_DESTROY_PENDING(TryReadGuard, getInterestList);
std::lock_guard<decltype(m_keysLock)> keysGuard(nthis->m_keysLock);
std::vector<std::shared_ptr<CacheableKey>> vlist;
std::transform(std::begin(m_durableInterestList),
std::end(m_durableInterestList), std::back_inserter(vlist),
[](const decltype(m_durableInterestList)