| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ThinClientRegion.hpp" |
| |
| #include <algorithm> |
| #include <limits> |
| #include <regex> |
| |
| #include <geode/PoolManager.hpp> |
| #include <geode/Struct.hpp> |
| #include <geode/SystemProperties.hpp> |
| #include <geode/UserFunctionExecutionException.hpp> |
| |
| #include "AutoDelete.hpp" |
| #include "CacheImpl.hpp" |
| #include "CacheRegionHelper.hpp" |
| #include "DataInputInternal.hpp" |
| #include "PutAllPartialResultServerException.hpp" |
| #include "ReadWriteLock.hpp" |
| #include "RegionGlobalLocks.hpp" |
| #include "RemoteQuery.hpp" |
| #include "TcrConnectionManager.hpp" |
| #include "TcrDistributionManager.hpp" |
| #include "TcrEndpoint.hpp" |
| #include "ThinClientBaseDM.hpp" |
| #include "ThinClientPoolDM.hpp" |
| #include "UserAttributes.hpp" |
| #include "Utils.hpp" |
| #include "VersionedCacheableObjectPartList.hpp" |
| #include "util/bounds.hpp" |
| #include "util/exception.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| static const std::regex PREDICATE_IS_FULL_QUERY_REGEX( |
| "^\\s*(?:select|import)\\b", std::regex::icase); |
| |
| void setThreadLocalExceptionMessage(const char* exMsg); |
| |
| class PutAllWork : public PooledWork<GfErrType>, |
| private NonCopyable, |
| private NonAssignable { |
| ThinClientPoolDM* m_poolDM; |
| std::shared_ptr<BucketServerLocation> m_serverLocation; |
| TcrMessage* m_request; |
| TcrMessageReply* m_reply; |
| MapOfUpdateCounters m_mapOfUpdateCounters; |
| bool m_attemptFailover; |
| bool m_isBGThread; |
| std::shared_ptr<UserAttributes> m_userAttribute; |
| const std::shared_ptr<Region> m_region; |
| std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> m_keys; |
| std::shared_ptr<HashMapOfCacheable> m_map; |
| std::shared_ptr<VersionedCacheableObjectPartList> m_verObjPartListPtr; |
| std::chrono::milliseconds m_timeout; |
| std::shared_ptr<PutAllPartialResultServerException> m_papException; |
| bool m_isPapeReceived; |
| ChunkedPutAllResponse* m_resultCollector; |
| // UNUSED const std::shared_ptr<Serializable>& m_aCallbackArgument; |
| |
| public: |
| PutAllWork( |
| ThinClientPoolDM* poolDM, |
| const std::shared_ptr<BucketServerLocation>& serverLocation, |
| const std::shared_ptr<Region>& region, bool attemptFailover, |
| bool isBGThread, const std::shared_ptr<HashMapOfCacheable> map, |
| const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> keys, |
| std::chrono::milliseconds timeout, |
| const std::shared_ptr<Serializable>& aCallbackArgument) |
| : m_poolDM(poolDM), |
| m_serverLocation(serverLocation), |
| m_attemptFailover(attemptFailover), |
| m_isBGThread(isBGThread), |
| m_userAttribute(nullptr), |
| m_region(region), |
| m_keys(keys), |
| m_map(map), |
| m_timeout(timeout), |
| m_papException(nullptr), |
| m_isPapeReceived(false) |
| // UNUSED , m_aCallbackArgument(aCallbackArgument) |
| { |
| m_request = new TcrMessagePutAll( |
| new DataOutput(m_region->getCache().createDataOutput()), m_region.get(), |
| *m_map, m_timeout, m_poolDM, aCallbackArgument); |
| m_reply = new TcrMessageReply(true, m_poolDM); |
| |
| // create new instanceof VCOPL |
| std::recursive_mutex responseLock; |
| m_verObjPartListPtr = |
| std::make_shared<VersionedCacheableObjectPartList>(keys, responseLock); |
| |
| if (m_poolDM->isMultiUserMode()) { |
| m_userAttribute = UserAttributes::threadLocalUserAttributes; |
| } |
| |
| m_request->setTimeout(m_timeout); |
| m_reply->setTimeout(m_timeout); |
| m_resultCollector = new ChunkedPutAllResponse( |
| m_region, *m_reply, responseLock, m_verObjPartListPtr); |
| m_reply->setChunkedResultHandler(m_resultCollector); |
| } |
| |
| ~PutAllWork() { |
| delete m_request; |
| delete m_reply; |
| delete m_resultCollector; |
| } |
| |
| TcrMessage* getReply() { return m_reply; } |
| |
| std::shared_ptr<HashMapOfCacheable> getPutAllMap() { return m_map; } |
| |
| std::shared_ptr<VersionedCacheableObjectPartList> getVerObjPartList() { |
| return m_verObjPartListPtr; |
| } |
| |
| ChunkedPutAllResponse* getResultCollector() { return m_resultCollector; } |
| |
| std::shared_ptr<BucketServerLocation> getServerLocation() { |
| return m_serverLocation; |
| } |
| |
| std::shared_ptr<PutAllPartialResultServerException> getPaPResultException() { |
| return m_papException; |
| } |
| |
| void init() {} |
| GfErrType execute(void) { |
| GuardUserAttributes gua; |
| |
| if (m_userAttribute != nullptr) { |
| gua.setAuthenticatedView(m_userAttribute->getAuthenticatedView()); |
| } |
| |
| GfErrType err = GF_NOERR; |
| err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover, |
| m_isBGThread, m_serverLocation); |
| |
| // Set Version Tags |
| LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ", |
| m_resultCollector->getList()->size(), err); |
| m_verObjPartListPtr->setVersionedTagptr( |
| m_resultCollector->getList()->getVersionedTagptr()); |
| |
| if (err != GF_NOERR) { |
| return err; |
| } /*This can be GF_NOTCON, counterpart to java |
| ServerConnectivityException*/ |
| |
| switch (m_reply->getMessageType()) { |
| case TcrMessage::REPLY: |
| break; |
| case TcrMessage::RESPONSE: |
| LOGDEBUG("PutAllwork execute err = %d ", err); |
| break; |
| case TcrMessage::EXCEPTION: |
| // TODO::Check for the PAPException and READ |
| // PutAllPartialResultServerException and set its member for later use. |
| // set m_papException and m_isPapeReceived |
| m_isPapeReceived = true; |
| if (m_poolDM->isNotAuthorizedException(m_reply->getException())) { |
| LOGDEBUG("received NotAuthorizedException"); |
| err = GF_AUTHENTICATION_FAILED_EXCEPTION; |
| } else if (m_poolDM->isPutAllPartialResultException( |
| m_reply->getException())) { |
| LOGDEBUG("received PutAllPartialResultException"); |
| err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION; |
| } else { |
| LOGDEBUG("received unknown exception:%s", m_reply->getException()); |
| err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION; |
| // TODO should assign a new err code |
| } |
| |
| break; |
| case TcrMessage::PUT_DATA_ERROR: |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| default: |
| LOGERROR("Unknown message type %d during region put-all", |
| m_reply->getMessageType()); |
| err = GF_NOTOBJ; |
| break; |
| } |
| return err; |
| } |
| }; |
| |
| class RemoveAllWork : public PooledWork<GfErrType>, |
| private NonCopyable, |
| private NonAssignable { |
| ThinClientPoolDM* m_poolDM; |
| std::shared_ptr<BucketServerLocation> m_serverLocation; |
| TcrMessage* m_request; |
| TcrMessageReply* m_reply; |
| MapOfUpdateCounters m_mapOfUpdateCounters; |
| bool m_attemptFailover; |
| bool m_isBGThread; |
| std::shared_ptr<UserAttributes> m_userAttribute; |
| const std::shared_ptr<Region> m_region; |
| const std::shared_ptr<Serializable>& m_aCallbackArgument; |
| std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> m_keys; |
| std::shared_ptr<VersionedCacheableObjectPartList> m_verObjPartListPtr; |
| std::shared_ptr<PutAllPartialResultServerException> m_papException; |
| bool m_isPapeReceived; |
| ChunkedRemoveAllResponse* m_resultCollector; |
| |
| public: |
| RemoveAllWork( |
| ThinClientPoolDM* poolDM, |
| const std::shared_ptr<BucketServerLocation>& serverLocation, |
| const std::shared_ptr<Region>& region, bool attemptFailover, |
| bool isBGThread, |
| const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> keys, |
| const std::shared_ptr<Serializable>& aCallbackArgument) |
| : m_poolDM(poolDM), |
| m_serverLocation(serverLocation), |
| m_attemptFailover(attemptFailover), |
| m_isBGThread(isBGThread), |
| m_userAttribute(nullptr), |
| m_region(region), |
| m_aCallbackArgument(aCallbackArgument), |
| m_keys(keys), |
| m_papException(nullptr), |
| m_isPapeReceived(false) { |
| m_request = new TcrMessageRemoveAll( |
| new DataOutput(m_region->getCache().createDataOutput()), m_region.get(), |
| *keys, m_aCallbackArgument, m_poolDM); |
| m_reply = new TcrMessageReply(true, m_poolDM); |
| // create new instanceof VCOPL |
| std::recursive_mutex responseLock; |
| m_verObjPartListPtr = |
| std::make_shared<VersionedCacheableObjectPartList>(keys, responseLock); |
| |
| if (m_poolDM->isMultiUserMode()) { |
| m_userAttribute = UserAttributes::threadLocalUserAttributes; |
| } |
| |
| m_resultCollector = new ChunkedRemoveAllResponse( |
| m_region, *m_reply, responseLock, m_verObjPartListPtr); |
| m_reply->setChunkedResultHandler(m_resultCollector); |
| } |
| |
| ~RemoveAllWork() { |
| delete m_request; |
| delete m_reply; |
| delete m_resultCollector; |
| } |
| |
| TcrMessage* getReply() { return m_reply; } |
| |
| std::shared_ptr<VersionedCacheableObjectPartList> getVerObjPartList() { |
| return m_verObjPartListPtr; |
| } |
| |
| ChunkedRemoveAllResponse* getResultCollector() { return m_resultCollector; } |
| |
| std::shared_ptr<BucketServerLocation> getServerLocation() { |
| return m_serverLocation; |
| } |
| |
| std::shared_ptr<PutAllPartialResultServerException> getPaPResultException() { |
| return m_papException; |
| } |
| |
| void init() {} |
| GfErrType execute(void) { |
| GuardUserAttributes gua; |
| |
| if (m_userAttribute != nullptr) { |
| gua.setAuthenticatedView(m_userAttribute->getAuthenticatedView()); |
| } |
| |
| GfErrType err = GF_NOERR; |
| err = m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover, |
| m_isBGThread, m_serverLocation); |
| |
| // Set Version Tags |
| LOGDEBUG(" m_verObjPartListPtr size = %d err = %d ", |
| m_resultCollector->getList()->size(), err); |
| m_verObjPartListPtr->setVersionedTagptr( |
| m_resultCollector->getList()->getVersionedTagptr()); |
| |
| if (err != GF_NOERR) { |
| return err; |
| } /*This can be GF_NOTCON, counterpart to java |
| ServerConnectivityException*/ |
| |
| switch (m_reply->getMessageType()) { |
| case TcrMessage::REPLY: |
| break; |
| case TcrMessage::RESPONSE: |
| LOGDEBUG("RemoveAllWork execute err = %d ", err); |
| break; |
| case TcrMessage::EXCEPTION: |
| // TODO::Check for the PAPException and READ |
| // PutAllPartialResultServerException and set its member for later use. |
| // set m_papException and m_isPapeReceived |
| m_isPapeReceived = true; |
| if (m_poolDM->isNotAuthorizedException(m_reply->getException())) { |
| LOGDEBUG("received NotAuthorizedException"); |
| err = GF_AUTHENTICATION_FAILED_EXCEPTION; |
| } else if (m_poolDM->isPutAllPartialResultException( |
| m_reply->getException())) { |
| LOGDEBUG("received PutAllPartialResultException"); |
| err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION; |
| } else { |
| LOGDEBUG("received unknown exception:%s", m_reply->getException()); |
| err = GF_PUTALL_PARTIAL_RESULT_EXCEPTION; |
| // TODO should assign a new err code |
| } |
| |
| break; |
| case TcrMessage::PUT_DATA_ERROR: |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| default: |
| LOGERROR("Unknown message type %d during region remove-all", |
| m_reply->getMessageType()); |
| err = GF_NOTOBJ; |
| break; |
| } |
| return err; |
| } |
| }; |
| |
| ThinClientRegion::ThinClientRegion( |
| const std::string& name, CacheImpl* cacheImpl, |
| const std::shared_ptr<RegionInternal>& rPtr, RegionAttributes attributes, |
| const std::shared_ptr<CacheStatistics>& stats, bool shared) |
| : LocalRegion(name, cacheImpl, rPtr, attributes, stats, shared), |
| m_tcrdm(nullptr), |
| m_notifyRelease(false), |
| m_isMetaDataRefreshed(false) { |
| m_transactionEnabled = true; |
| m_isDurableClnt = !cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .durableClientId() |
| .empty(); |
| } |
| |
| void ThinClientRegion::initTCR() { |
| try { |
| m_tcrdm = std::make_shared<TcrDistributionManager>( |
| this, m_cacheImpl->tcrConnectionManager()); |
| m_tcrdm->init(); |
| } catch (const Exception& ex) { |
| LOGERROR("Exception while initializing region: %s: %s", |
| ex.getName().c_str(), ex.what()); |
| throw; |
| } |
| } |
| |
| void ThinClientRegion::registerKeys( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable, |
| bool getInitialValues, bool receiveValues) { |
| auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| if (pool != nullptr) { |
| if (!pool->getSubscriptionEnabled()) { |
| LOGERROR( |
| "Registering keys is supported " |
| "only if subscription-enabled attribute is true for pool " + |
| pool->getName()); |
| throw UnsupportedOperationException( |
| "Registering keys is supported " |
| "only if pool subscription-enabled attribute is true."); |
| } |
| } |
| if (keys.empty()) { |
| LOGERROR("Register keys list is empty"); |
| throw IllegalArgumentException( |
| "Register keys " |
| "keys vector is empty"); |
| } |
| if (isDurable && !isDurableClient()) { |
| LOGERROR( |
| "Register keys durable flag is only applicable for durable clients"); |
| throw IllegalStateException( |
| "Durable flag only applicable for " |
| "durable clients"); |
| } |
| if (getInitialValues && !m_regionAttributes.getCachingEnabled()) { |
| LOGERROR( |
| "Register keys getInitialValues flag is only applicable for caching" |
| "clients"); |
| throw IllegalStateException( |
| "getInitialValues flag only applicable for caching clients"); |
| } |
| |
| InterestResultPolicy interestPolicy = InterestResultPolicy::NONE; |
| if (getInitialValues) { |
| interestPolicy = InterestResultPolicy::KEYS_VALUES; |
| } |
| |
| LOGDEBUG("ThinClientRegion::registerKeys : interestpolicy is %d", |
| interestPolicy.ordinal); |
| |
| GfErrType err = registerKeysNoThrow(keys, true, nullptr, isDurable, |
| interestPolicy, receiveValues); |
| |
| if (m_tcrdm->isFatalError(err)) { |
| throwExceptionIfError("Region::registerKeys", err); |
| } |
| |
| throwExceptionIfError("Region::registerKeys", err); |
| } |
| |
| void ThinClientRegion::unregisterKeys( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys) { |
| auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| if (pool != nullptr) { |
| if (!pool->getSubscriptionEnabled()) { |
| LOGERROR( |
| "Unregister keys is supported " |
| "only if subscription-enabled attribute is true for pool " + |
| pool->getName()); |
| throw UnsupportedOperationException( |
| "Unregister keys is supported " |
| "only if pool subscription-enabled attribute is true."); |
| } |
| } else { |
| if (!getAttributes().getClientNotificationEnabled()) { |
| LOGERROR( |
| "Unregister keys is supported " |
| "only if region client-notification attribute is true."); |
| throw UnsupportedOperationException( |
| "Unregister keys is supported " |
| "only if region client-notification attribute is true."); |
| } |
| } |
| if (keys.empty()) { |
| LOGERROR("Unregister keys list is empty"); |
| throw IllegalArgumentException( |
| "Unregister keys " |
| "keys vector is empty"); |
| } |
| GfErrType err = unregisterKeysNoThrow(keys); |
| throwExceptionIfError("Region::unregisterKeys", err); |
| } |
| |
| void ThinClientRegion::registerAllKeys(bool isDurable, bool getInitialValues, |
| bool receiveValues) { |
| auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| if (pool != nullptr) { |
| if (!pool->getSubscriptionEnabled()) { |
| LOGERROR( |
| "Register all keys is supported only " |
| "if subscription-enabled attribute is true for pool " + |
| pool->getName()); |
| throw UnsupportedOperationException( |
| "Register all keys is supported only " |
| "if pool subscription-enabled attribute is true."); |
| } |
| } |
| if (isDurable && !isDurableClient()) { |
| LOGERROR( |
| "Register all keys durable flag is only applicable for durable " |
| "clients"); |
| throw IllegalStateException( |
| "Durable flag only applicable for durable clients"); |
| } |
| |
| if (getInitialValues && !m_regionAttributes.getCachingEnabled()) { |
| LOGERROR( |
| "Register all keys getInitialValues flag is only applicable for caching" |
| "clients"); |
| throw IllegalStateException( |
| "getInitialValues flag only applicable for caching clients"); |
| } |
| |
| InterestResultPolicy interestPolicy = InterestResultPolicy::NONE; |
| if (getInitialValues) { |
| interestPolicy = InterestResultPolicy::KEYS_VALUES; |
| } else { |
| interestPolicy = InterestResultPolicy::KEYS; |
| } |
| |
| LOGDEBUG("ThinClientRegion::registerAllKeys : interestpolicy is %d", |
| interestPolicy.ordinal); |
| |
| std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> resultKeys; |
| // if we need to fetch initial data, then we get the keys in |
| // that call itself using the special GET_ALL message and do not need |
| // to get the keys in the initial register interest call |
| GfErrType err = |
| registerRegexNoThrow(".*", true, nullptr, isDurable, resultKeys, |
| interestPolicy, receiveValues); |
| |
| if (m_tcrdm->isFatalError(err)) { |
| throwExceptionIfError("Region::registerAllKeys", err); |
| } |
| |
| // Get the entries from the server using a special GET_ALL message |
| throwExceptionIfError("Region::registerAllKeys", err); |
| } |
| |
| void ThinClientRegion::registerRegex(const std::string& regex, bool isDurable, |
| bool getInitialValues, |
| bool receiveValues) { |
| auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| if (pool != nullptr) { |
| if (!pool->getSubscriptionEnabled()) { |
| LOGERROR( |
| "Register regex is supported only if " |
| "subscription-enabled attribute is true for pool " + |
| pool->getName()); |
| throw UnsupportedOperationException( |
| "Register regex is supported only if " |
| "pool subscription-enabled attribute is true."); |
| } |
| } |
| if (isDurable && !isDurableClient()) { |
| LOGERROR("Register regex durable flag only applicable for durable clients"); |
| throw IllegalStateException( |
| "Durable flag only applicable for durable clients"); |
| } |
| |
| if (regex.empty()) { |
| throw IllegalArgumentException( |
| "Region::registerRegex: Regex string is empty"); |
| } |
| |
| auto interestPolicy = InterestResultPolicy::NONE; |
| if (getInitialValues) { |
| interestPolicy = InterestResultPolicy::KEYS_VALUES; |
| } else { |
| interestPolicy = InterestResultPolicy::KEYS; |
| } |
| |
| LOGDEBUG("ThinClientRegion::registerRegex : interestpolicy is %d", |
| interestPolicy.ordinal); |
| |
| auto resultKeys2 = |
| std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>(); |
| |
| // if we need to fetch initial data for "allKeys" case, then we |
| // get the keys in that call itself using the special GET_ALL message and |
| // do not need to get the keys in the initial register interest call |
| GfErrType err = |
| registerRegexNoThrow(regex, true, nullptr, isDurable, resultKeys2, |
| interestPolicy, receiveValues); |
| |
| if (m_tcrdm->isFatalError(err)) { |
| throwExceptionIfError("Region::registerRegex", err); |
| } |
| |
| throwExceptionIfError("Region::registerRegex", err); |
| } |
| |
| void ThinClientRegion::unregisterRegex(const std::string& regex) { |
| auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| if (pool != nullptr) { |
| if (!pool->getSubscriptionEnabled()) { |
| LOGERROR( |
| "Unregister regex is supported only if " |
| "subscription-enabled attribute is true for pool " + |
| pool->getName()); |
| throw UnsupportedOperationException( |
| "Unregister regex is supported only if " |
| "pool subscription-enabled attribute is true."); |
| } |
| } |
| |
| if (regex.empty()) { |
| LOGERROR("Unregister regex string is empty"); |
| throw IllegalArgumentException("Unregister regex string is empty"); |
| } |
| |
| GfErrType err = unregisterRegexNoThrow(regex); |
| throwExceptionIfError("Region::unregisterRegex", err); |
| } |
| |
| void ThinClientRegion::unregisterAllKeys() { |
| auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| if (pool != nullptr) { |
| if (!pool->getSubscriptionEnabled()) { |
| LOGERROR( |
| "Unregister all keys is supported only if " |
| "subscription-enabled attribute is true for pool " + |
| pool->getName()); |
| throw UnsupportedOperationException( |
| "Unregister all keys is supported only if " |
| "pool subscription-enabled attribute is true."); |
| } |
| } |
| GfErrType err = unregisterRegexNoThrow(".*"); |
| throwExceptionIfError("Region::unregisterAllKeys", err); |
| } |
| |
| std::shared_ptr<SelectResults> ThinClientRegion::query( |
| const std::string& predicate, std::chrono::milliseconds timeout) { |
| util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout); |
| |
| CHECK_DESTROY_PENDING(TryReadGuard, Region::query); |
| |
| if (predicate.empty()) { |
| LOGERROR("Region query predicate string is empty"); |
| throw IllegalArgumentException("Region query predicate string is empty"); |
| } |
| |
| std::string squery; |
| if (std::regex_search(predicate, PREDICATE_IS_FULL_QUERY_REGEX)) { |
| squery = predicate; |
| } else { |
| squery = "select distinct * from "; |
| squery += getFullPath(); |
| squery += " this where "; |
| squery += predicate; |
| } |
| |
| std::shared_ptr<RemoteQuery> queryPtr; |
| |
| if (auto poolDM = std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) { |
| queryPtr = std::dynamic_pointer_cast<RemoteQuery>( |
| poolDM->getQueryServiceWithoutCheck()->newQuery(squery.c_str())); |
| } else { |
| queryPtr = std::dynamic_pointer_cast<RemoteQuery>( |
| m_cacheImpl->getQueryService()->newQuery(squery.c_str())); |
| } |
| |
| return queryPtr->execute(timeout, "Region::query", m_tcrdm.get(), nullptr); |
| } |
| |
| bool ThinClientRegion::existsValue(const std::string& predicate, |
| std::chrono::milliseconds timeout) { |
| util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout); |
| |
| auto results = query(predicate, timeout); |
| |
| if (results == nullptr) { |
| return false; |
| } |
| |
| return results->size() > 0; |
| } |
| |
| GfErrType ThinClientRegion::unregisterKeysBeforeDestroyRegion() { |
| auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| if (pool != nullptr) { |
| if (!pool->getSubscriptionEnabled()) { |
| LOGDEBUG( |
| "pool subscription-enabled attribute is false, No need to Unregister " |
| "keys"); |
| return GF_NOERR; |
| } |
| } |
| GfErrType err = GF_NOERR; |
| GfErrType opErr = GF_NOERR; |
| |
| opErr = unregisterStoredRegexLocalDestroy(m_interestListRegex); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = unregisterStoredRegexLocalDestroy(m_durableInterestListRegex); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = unregisterStoredRegexLocalDestroy( |
| m_interestListRegexForUpdatesAsInvalidates); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = unregisterStoredRegexLocalDestroy( |
| m_durableInterestListRegexForUpdatesAsInvalidates); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVec; |
| copyInterestList(keysVec, m_interestList); |
| opErr = unregisterKeysNoThrowLocalDestroy(keysVec, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVecDurable; |
| copyInterestList(keysVecDurable, m_durableInterestList); |
| opErr = unregisterKeysNoThrowLocalDestroy(keysVecDurable, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates; |
| copyInterestList(keysVecForUpdatesAsInvalidates, |
| m_interestListForUpdatesAsInvalidates); |
| opErr = |
| unregisterKeysNoThrowLocalDestroy(keysVecForUpdatesAsInvalidates, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> |
| keysVecDurableForUpdatesAsInvalidates; |
| copyInterestList(keysVecDurableForUpdatesAsInvalidates, |
| m_durableInterestListForUpdatesAsInvalidates); |
| opErr = unregisterKeysNoThrowLocalDestroy( |
| keysVecDurableForUpdatesAsInvalidates, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| return err; |
| } |
| std::shared_ptr<Serializable> ThinClientRegion::selectValue( |
| const std::string& predicate, std::chrono::milliseconds timeout) { |
| auto results = query(predicate, timeout); |
| |
| if (results == nullptr || results->size() == 0) { |
| return nullptr; |
| } |
| |
| if (results->size() > 1) { |
| throw QueryException("selectValue has more than one result"); |
| } |
| |
| return results->operator[](0); |
| } |
| |
| std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::serverKeys() { |
| CHECK_DESTROY_PENDING(TryReadGuard, Region::serverKeys); |
| |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| TcrMessageKeySet request(new DataOutput(m_cacheImpl->createDataOutput()), |
| m_fullPath, m_tcrdm.get()); |
| reply.setMessageTypeRequest(TcrMessage::KEY_SET); |
| std::vector<std::shared_ptr<CacheableKey>> serverKeys; |
| ChunkedKeySetResponse resultCollector(request, serverKeys, reply); |
| reply.setChunkedResultHandler(&resultCollector); |
| |
| GfErrType err = GF_NOERR; |
| |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| |
| throwExceptionIfError("Region::serverKeys", err); |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| // keyset result is handled by ChunkedKeySetResponse |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region:serverKeys", reply.getException()); |
| break; |
| } |
| case TcrMessage::KEY_SET_DATA_ERROR: { |
| LOGERROR("Region serverKeys: an error occurred on the server"); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d during region serverKeys", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| throwExceptionIfError("Region::serverKeys", err); |
| |
| return serverKeys; |
| } |
| |
| bool ThinClientRegion::containsKeyOnServer( |
| const std::shared_ptr<CacheableKey>& keyPtr) const { |
| GfErrType err = GF_NOERR; |
| bool ret = false; |
| |
| /** @brief Create message and send to bridge server */ |
| |
| TcrMessageContainsKey request( |
| new DataOutput(m_cacheImpl->createDataOutput()), this, keyPtr, |
| static_cast<std::shared_ptr<Serializable>>(nullptr), true, m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: |
| ret = reply.getBoolValue(); |
| break; |
| |
| case TcrMessage::EXCEPTION: |
| err = handleServerException("Region::containsKeyOnServer:", |
| reply.getException()); |
| break; |
| |
| case TcrMessage::REQUEST_DATA_ERROR: |
| LOGERROR( |
| "Region::containsKeyOnServer: read error occurred on the endpoint %s", |
| m_tcrdm->getActiveEndpoint()->name().c_str()); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| |
| default: |
| LOGERROR("Unknown message type in Region::containsKeyOnServer %d", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| |
| auto rptr = CacheableBoolean::create(ret); |
| |
| rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr)); |
| throwExceptionIfError("Region::containsKeyOnServer ", err); |
| return rptr->value(); |
| } |
| |
| bool ThinClientRegion::containsValueForKey_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr) const { |
| GfErrType err = GF_NOERR; |
| bool ret = false; |
| |
| /** @brief Create message and send to bridge server */ |
| |
| TcrMessageContainsKey request( |
| new DataOutput(m_cacheImpl->createDataOutput()), this, keyPtr, |
| static_cast<std::shared_ptr<Serializable>>(nullptr), false, |
| m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| // if ( err != GF_NOERR ) return ret; |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: |
| ret = reply.getBoolValue(); |
| break; |
| |
| case TcrMessage::EXCEPTION: |
| err = handleServerException("Region::containsValueForKey:", |
| reply.getException()); |
| break; |
| |
| case TcrMessage::REQUEST_DATA_ERROR: |
| LOGERROR( |
| "Region::containsValueForKey: read error occurred on the endpoint %s", |
| m_tcrdm->getActiveEndpoint()->name().c_str()); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| |
| default: |
| LOGERROR("Unknown message type in Region::containsValueForKey %d", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| |
| auto rptr = CacheableBoolean::create(ret); |
| |
| rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr)); |
| |
| throwExceptionIfError("Region::containsValueForKey ", err); |
| return rptr->value(); |
| } |
| |
| void ThinClientRegion::clear( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = GF_NOERR; |
| err = localClearNoThrow(aCallbackArgument, CacheEventFlags::NORMAL); |
| if (err != GF_NOERR) throwExceptionIfError("Region::clear", err); |
| |
| /** @brief Create message and send to bridge server */ |
| |
| TcrMessageClearRegion request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, aCallbackArgument, |
| std::chrono::milliseconds(-1), m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) throwExceptionIfError("Region::clear", err); |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: |
| LOGFINE("Region %s clear message sent to server successfully", |
| m_fullPath.c_str()); |
| break; |
| case TcrMessage::EXCEPTION: |
| err = handleServerException("Region::clear:", reply.getException()); |
| break; |
| |
| case TcrMessage::CLEAR_REGION_DATA_ERROR: |
| LOGERROR("Region clear read error occurred on the endpoint %s", |
| m_tcrdm->getActiveEndpoint()->name().c_str()); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| |
| default: |
| LOGERROR("Unknown message type %d during region clear", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| if (err == GF_NOERR) { |
| err = invokeCacheListenerForRegionEvent( |
| aCallbackArgument, CacheEventFlags::NORMAL, AFTER_REGION_CLEAR); |
| } |
| throwExceptionIfError("Region::clear", err); |
| } |
| |
| GfErrType ThinClientRegion::getNoThrow_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| std::shared_ptr<Cacheable>& valPtr, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| GfErrType err = GF_NOERR; |
| |
| /** @brief Create message and send to bridge server */ |
| |
| TcrMessageRequest request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keyPtr, aCallbackArgument, m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) return err; |
| |
| // put the object into local region |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| valPtr = reply.getValue(); |
| versionTag = reply.getVersionTag(); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region::get", reply.getException()); |
| break; |
| } |
| case TcrMessage::REQUEST_DATA_ERROR: { |
| // LOGERROR("A read error occurred on the endpoint %s", |
| // m_tcrdm->getActiveEndpoint()->name().c_str()); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d while getting entry from region", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::invalidateNoThrow_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageInvalidate request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keyPtr, aCallbackArgument, m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) return err; |
| |
| // put the object into local region |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: { |
| versionTag = reply.getVersionTag(); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region::get", reply.getException()); |
| break; |
| } |
| case TcrMessage::INVALIDATE_ERROR: { |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d while getting entry from region", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::putNoThrow_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Cacheable>& valuePtr, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag, bool checkDelta) { |
| GfErrType err = GF_NOERR; |
| // do TCR put |
| // bool delta = valuePtr->hasDelta(); |
| bool delta = false; |
| auto&& conFlationValue = getCacheImpl() |
| ->getDistributedSystem() |
| .getSystemProperties() |
| .conflateEvents(); |
| if (checkDelta && valuePtr && conFlationValue != "true" && |
| ThinClientBaseDM::isDeltaEnabledOnServer()) { |
| auto&& temp = std::dynamic_pointer_cast<Delta>(valuePtr); |
| delta = temp && temp->hasDelta(); |
| } |
| TcrMessagePut request(new DataOutput(m_cacheImpl->createDataOutput()), this, |
| keyPtr, valuePtr, aCallbackArgument, delta, |
| m_tcrdm.get()); |
| TcrMessageReply* reply = new TcrMessageReply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, *reply); |
| if (delta) { |
| // Does not check whether success of failure.. |
| m_cacheImpl->getCachePerfStats().incDeltaPut(); |
| if (reply->getMessageType() == TcrMessage::PUT_DELTA_ERROR) { |
| // Try without delta |
| TcrMessagePut request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keyPtr, valuePtr, aCallbackArgument, false, |
| m_tcrdm.get(), false, true); |
| delete reply; |
| reply = new TcrMessageReply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, *reply); |
| } |
| } |
| if (err != GF_NOERR) return err; |
| |
| // put the object into local region |
| switch (reply->getMessageType()) { |
| case TcrMessage::REPLY: { |
| versionTag = reply->getVersionTag(); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region::put", reply->getException()); |
| break; |
| } |
| case TcrMessage::PUT_DATA_ERROR: { |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d during region put reply", |
| reply->getMessageType()); |
| err = GF_MSG; |
| } |
| } |
| delete reply; |
| reply = nullptr; |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::createNoThrow_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Cacheable>& valuePtr, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| return putNoThrow_remote(keyPtr, valuePtr, aCallbackArgument, versionTag, |
| false); |
| } |
| |
| GfErrType ThinClientRegion::destroyNoThrow_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| GfErrType err = GF_NOERR; |
| |
| // do TCR destroy |
| TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keyPtr, nullptr, aCallbackArgument, |
| m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) return err; |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: { |
| if (reply.getEntryNotFound() == 1) { |
| err = GF_CACHE_ENTRY_NOT_FOUND; |
| } else { |
| LOGDEBUG("Remote key [%s] is destroyed successfully in region %s", |
| Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str()); |
| } |
| versionTag = reply.getVersionTag(); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region::destroy", reply.getException()); |
| break; |
| } |
| case TcrMessage::DESTROY_DATA_ERROR: { |
| err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d while destroying region entry", |
| reply.getMessageType()); |
| err = GF_MSG; |
| } |
| } |
| |
| return err; |
| } // destroyNoThrow_remote() |
| |
| GfErrType ThinClientRegion::removeNoThrow_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Cacheable>& cvalue, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| GfErrType err = GF_NOERR; |
| |
| // do TCR remove |
| TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keyPtr, cvalue, aCallbackArgument, |
| m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: { |
| if (reply.getEntryNotFound() == 1) { |
| err = GF_ENOENT; |
| } else { |
| LOGDEBUG("Remote key [%s] is removed successfully in region %s", |
| Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str()); |
| } |
| versionTag = reply.getVersionTag(); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region::remove", reply.getException()); |
| break; |
| } |
| case TcrMessage::DESTROY_DATA_ERROR: { |
| err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d while removing region entry", |
| reply.getMessageType()); |
| err = GF_MSG; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::removeNoThrowEX_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| GfErrType err = GF_NOERR; |
| |
| // do TCR remove |
| TcrMessageDestroy request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keyPtr, nullptr, aCallbackArgument, |
| m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: { |
| versionTag = reply.getVersionTag(); |
| if (reply.getEntryNotFound() == 1) { |
| err = GF_ENOENT; |
| } else { |
| LOGDEBUG("Remote key [%s] is removed successfully in region %s", |
| Utils::nullSafeToString(keyPtr).c_str(), m_fullPath.c_str()); |
| } |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region::removeEx", reply.getException()); |
| break; |
| } |
| case TcrMessage::DESTROY_DATA_ERROR: { |
| err = GF_CACHE_ENTRY_DESTROYED_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d while removing region entry", |
| reply.getMessageType()); |
| err = GF_MSG; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::getAllNoThrow_remote( |
| const std::vector<std::shared_ptr<CacheableKey>>* keys, |
| const std::shared_ptr<HashMapOfCacheable>& values, |
| const std::shared_ptr<HashMapOfException>& exceptions, |
| const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>& |
| resultKeys, |
| bool addToLocalCache, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = GF_NOERR; |
| MapOfUpdateCounters updateCountMap; |
| int32_t destroyTracker = 0; |
| addToLocalCache = addToLocalCache && m_regionAttributes.getCachingEnabled(); |
| if (addToLocalCache && !m_regionAttributes.getConcurrencyChecksEnabled()) { |
| // start tracking the entries |
| if (keys == nullptr) { |
| // track all entries with destroy tracking for non-existent entries |
| destroyTracker = m_entries->addTrackerForAllEntries(updateCountMap, true); |
| } else { |
| for (const auto& key : *keys) { |
| std::shared_ptr<Cacheable> oldValue; |
| int updateCount = |
| m_entries->addTrackerForEntry(key, oldValue, true, false, false); |
| updateCountMap.emplace(key, updateCount); |
| } |
| } |
| } |
| // create the GET_ALL request |
| TcrMessageGetAll request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keys, m_tcrdm.get(), aCallbackArgument); |
| |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| std::recursive_mutex responseLock; |
| // need to check |
| TcrChunkedResult* resultCollector(new ChunkedGetAllResponse( |
| reply, this, keys, values, exceptions, resultKeys, updateCountMap, |
| destroyTracker, addToLocalCache, responseLock)); |
| |
| reply.setChunkedResultHandler(resultCollector); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| |
| if (addToLocalCache && !m_regionAttributes.getConcurrencyChecksEnabled()) { |
| // remove the tracking for remaining keys in case some keys do not have |
| // values from server in GII |
| for (MapOfUpdateCounters::const_iterator iter = updateCountMap.begin(); |
| iter != updateCountMap.end(); ++iter) { |
| if (iter->second >= 0) { |
| m_entries->removeTrackerForEntry(iter->first); |
| } |
| } |
| // remove tracking for destroys |
| if (destroyTracker > 0) { |
| m_entries->removeDestroyTracking(); |
| } |
| } |
| delete resultCollector; |
| if (err != GF_NOERR) { |
| return err; |
| } |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| // nothing to be done; put in local region, if required, |
| // is handled by ChunkedGetAllResponse |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region:getAll", reply.getException()); |
| break; |
| } |
| case TcrMessage::GET_ALL_DATA_ERROR: { |
| LOGERROR("Region get-all: a read error occurred on the endpoint %s", |
| m_tcrdm->getActiveEndpoint()->name().c_str()); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d during region get-all", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote( |
| ThinClientPoolDM* tcrdm, const HashMapOfCacheable& map, |
| std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList, |
| std::chrono::milliseconds timeout, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| LOGDEBUG(" ThinClientRegion::singleHopPutAllNoThrow_remote map size = %zu", |
| map.size()); |
| auto region = shared_from_this(); |
| |
| auto error = GF_NOERR; |
| /*Step-1:: |
| * populate the keys vector from the user Map and pass it to the |
| * getServerToFilterMap to generate locationMap |
| * If locationMap is nullptr try the old, existing putAll impl that may take |
| * multiple n/w hops |
| */ |
| auto userKeys = std::vector<std::shared_ptr<CacheableKey>>(); |
| for (const auto& iter : map) { |
| userKeys.push_back(iter.first); |
| } |
| // last param in getServerToFilterMap() is false for putAll |
| |
| // LOGDEBUG("ThinClientRegion::singleHopPutAllNoThrow_remote keys.size() = %d |
| // ", userKeys->size()); |
| auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap( |
| userKeys, region, true); |
| if (!locationMap) { |
| // putAll with multiple hop implementation |
| LOGDEBUG("locationMap is Null or Empty"); |
| |
| return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout, |
| aCallbackArgument); |
| } |
| |
| // set this flag that indicates putAll on PR is invoked with singlehop |
| // enabled. |
| m_isPRSingleHopEnabled = true; |
| // LOGDEBUG("locationMap.size() = %d ", locationMap->size()); |
| |
| /*Step-2 |
| * a. create vector of PutAllWork |
| * b. locationMap<std::shared_ptr<BucketServerLocation>, |
| * std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create |
| * server specific filteredMap/subMap by populating all keys |
| * (locationIter.second()) and its corr. values from the user Map. |
| * c. create new instance of PutAllWork, i.e worker with required params. |
| * //TODO:: Add details of each parameter later |
| * d. enqueue the worker for thread from threadPool to perform/run execute |
| * method. |
| * e. insert the worker into the vector. |
| */ |
| std::vector<std::shared_ptr<PutAllWork>> putAllWorkers; |
| auto& threadPool = m_cacheImpl->getThreadPool(); |
| int locationMapIndex = 0; |
| for (const auto& locationIter : *locationMap) { |
| const auto& serverLocation = locationIter.first; |
| if (serverLocation == nullptr) { |
| LOGDEBUG("serverLocation is nullptr"); |
| } |
| const auto& keys = locationIter.second; |
| |
| // Create server specific Sub-Map by iterating over keys. |
| auto filteredMap = std::make_shared<HashMapOfCacheable>(); |
| if (keys != nullptr && keys->size() > 0) { |
| for (const auto& key : *keys) { |
| const auto& iter = map.find(key); |
| if (iter != map.end()) { |
| filteredMap->emplace(iter->first, iter->second); |
| } |
| } |
| } |
| |
| auto worker = std::make_shared<PutAllWork>( |
| tcrdm, serverLocation, region, true /*attemptFailover*/, |
| false /*isBGThread*/, filteredMap, keys, timeout, aCallbackArgument); |
| threadPool.perform(worker); |
| putAllWorkers.push_back(worker); |
| locationMapIndex++; |
| } |
| |
| // TODO::CHECK, do we need to set following ..?? |
| // reply.setMessageType(TcrMessage::RESPONSE); |
| |
| int cnt = 1; |
| |
| /** |
| * Step::3 |
| * a. Iterate over all vector of putAllWorkers and populate worker specific |
| * information into the HashMap |
| * resultMap<std::shared_ptr<BucketServerLocation>, |
| * std::shared_ptr<Serializable>>, 2nd part, Value can be a |
| * std::shared_ptr<VersionedCacheableObjectPartList> or |
| * std::shared_ptr<PutAllPartialResultServerException>. |
| * failedServers<std::shared_ptr<BucketServerLocation>, |
| * std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete |
| * the worker |
| */ |
| auto resultMap = ResultMap(); |
| auto failedServers = FailedServersMap(); |
| |
| for (const auto& worker : putAllWorkers) { |
| auto err = |
| worker->getResult(); // wait() or blocking call for worker thread. |
| LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err); |
| |
| if (GF_NOERR == err) { |
| // No Exception from server |
| resultMap.emplace(worker->getServerLocation(), |
| worker->getResultCollector()->getList()); |
| } else { |
| error = err; |
| |
| if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { |
| resultMap.emplace(worker->getServerLocation(), |
| worker->getPaPResultException()); |
| } else if (error == GF_NOTCON) { |
| // Refresh the metadata in case of GF_NOTCON. |
| tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh( |
| region->getFullPath(), 0); |
| } |
| failedServers.emplace(worker->getServerLocation(), |
| CacheableInt32::create(error)); |
| } |
| |
| LOGDEBUG("worker->getPutAllMap()->size() = %zu ", |
| worker->getPutAllMap()->size()); |
| LOGDEBUG( |
| "worker->getResultCollector()->getList()->getVersionedTagsize() = %d ", |
| worker->getResultCollector()->getList()->getVersionedTagsize()); |
| |
| // TODO::CHECK, why do we need following code... ?? |
| // TcrMessage* currentReply = worker->getReply(); |
| /* |
| if(currentReply->getMessageType() != TcrMessage::REPLY) |
| { |
| reply.setMessageType(currentReply->getMessageType()); |
| } |
| */ |
| |
| cnt++; |
| } |
| /** |
| * Step:4 |
| * a. create instance of std::shared_ptr<PutAllPartialResult> with total size= |
| * map.size() b. Iterate over the resultMap and value for the particular |
| * serverlocation is of type VersionedCacheableObjectPartList add keys and |
| * versions. C. ToDO:: what if the value in the resultMap is of type |
| * PutAllPartialResultServerException |
| */ |
| std::recursive_mutex responseLock; |
| auto result = std::make_shared<PutAllPartialResult>( |
| static_cast<int>(map.size()), responseLock); |
| LOGDEBUG( |
| " TCRegion:: %s:%d " |
| "result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ", |
| __FILE__, __LINE__, |
| result->getSucceededKeysAndVersions()->getVersionedTagsize()); |
| LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__, |
| resultMap.size()); |
| for (const auto& resultMapIter : resultMap) { |
| const auto& value = resultMapIter.second; |
| |
| if (const auto papException = |
| std::dynamic_pointer_cast<PutAllPartialResultServerException>( |
| value)) { |
| // PutAllPartialResultServerException CASE:: value in resultMap is of type |
| // PutAllPartialResultServerException. |
| // TODO:: Add failedservers.keySet= all fialed servers, i.e list out all |
| // keys in map failedServers, |
| // that is set view of the keys contained in failedservers map. |
| // TODO:: need to read papException and populate PutAllPartialResult. |
| result->consolidate(papException->getResult()); |
| } else if (const auto list = |
| std::dynamic_pointer_cast<VersionedCacheableObjectPartList>( |
| value)) { |
| // value in resultMap is of type VCOPL. |
| result->addKeysAndVersions(list); |
| } else { |
| // ERROR CASE |
| if (value) { |
| LOGERROR( |
| "ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value " |
| "could not Cast to either VCOPL or " |
| "PutAllPartialResultServerException:%s", |
| value->toString().c_str()); |
| } else { |
| LOGERROR( |
| "ERROR:: ThinClientRegion::singleHopPutAllNoThrow_remote value is " |
| "nullptr"); |
| } |
| } |
| } |
| |
| /** |
| * a. if PutAllPartialResult result does not contains any entry, Iterate over |
| * locationMap. |
| * b. Create std::vector<std::shared_ptr<CacheableKey>> succeedKeySet, and |
| * keep adding set of keys (locationIter.second()) in locationMap for which |
| * failedServers->contains(locationIter.first()is false. |
| */ |
| |
| LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__, |
| __LINE__, failedServers.size()); |
| |
| // if the partial result set doesn't already have keys (for tracking version |
| // tags) |
| // then we need to gather up the keys that we know have succeeded so far and |
| // add them to the partial result set (See bug Id #955) |
| if (!failedServers.empty()) { |
| auto succeedKeySet = |
| std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>(); |
| if (result->getSucceededKeysAndVersions()->size() == 0) { |
| for (const auto& locationIter : *locationMap) { |
| if (failedServers.find(locationIter.first) != failedServers.end()) { |
| for (const auto& i : *(locationIter.second)) { |
| succeedKeySet->push_back(i); |
| } |
| } |
| } |
| result->addKeys(succeedKeySet); |
| } |
| } |
| |
| /** |
| * a. Iterate over the failedServers map |
| * c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then |
| * continue, Do not retry putAll for corr. keys. |
| * b. Retry for all the failed server. |
| * Generate a newSubMap by finding Keys specific to failedServers from |
| * locationMap and finding their respective values from the usermap. |
| */ |
| error = GF_NOERR; |
| bool oneSubMapRetryFailed = false; |
| for (const auto& failedServerIter : failedServers) { |
| if (failedServerIter.second->value() == |
| GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation |
| // will not retry for PutAllPartialResultException |
| // but it means at least one sub map ever failed |
| oneSubMapRetryFailed = true; |
| error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION; |
| continue; |
| } |
| |
| std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys = |
| nullptr; |
| const auto& failedSerInLocMapIter = |
| locationMap->find(failedServerIter.first); |
| if (failedSerInLocMapIter != locationMap->end()) { |
| failedKeys = failedSerInLocMapIter->second; |
| } |
| |
| if (failedKeys == nullptr) { |
| LOGERROR( |
| "TCRegion::singleHopPutAllNoThrow_remote :: failedKeys are nullptr " |
| "that is not valid"); |
| } |
| |
| auto newSubMap = std::make_shared<HashMapOfCacheable>(); |
| if (failedKeys && !failedKeys->empty()) { |
| for (const auto& key : *failedKeys) { |
| const auto& iter = map.find(key); |
| if (iter != map.end()) { |
| newSubMap->emplace(iter->first, iter->second); |
| } else { |
| LOGERROR( |
| "DEBUG:: TCRegion.cpp singleHopPutAllNoThrow_remote KEY not " |
| "found in user failedSubMap"); |
| } |
| } |
| } |
| |
| std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr; |
| GfErrType errCode = multiHopPutAllNoThrow_remote( |
| *newSubMap, vcopListPtr, timeout, aCallbackArgument); |
| if (errCode == GF_NOERR) { |
| result->addKeysAndVersions(vcopListPtr); |
| } else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { |
| oneSubMapRetryFailed = true; |
| // TODO:: Commented it as papResultServerExc is nullptr this time |
| // UnComment it once you read papResultServerExc. |
| // result->consolidate(papResultServerExc->getResult()); |
| error = errCode; |
| } else /*if(errCode != GF_NOERR)*/ { |
| oneSubMapRetryFailed = true; |
| const auto& firstKey = newSubMap->begin()->first; |
| std::shared_ptr<Exception> excptPtr = nullptr; |
| // TODO:: formulat excptPtr from the errCode |
| result->saveFailedKey(firstKey, excptPtr); |
| error = errCode; |
| } |
| } |
| |
| if (!oneSubMapRetryFailed) { |
| error = GF_NOERR; |
| } |
| versionedObjPartList = result->getSucceededKeysAndVersions(); |
| LOGDEBUG("singlehop versionedObjPartList = %d error=%d", |
| versionedObjPartList->size(), error); |
| |
| return error; |
| } |
| |
| GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote( |
| const HashMapOfCacheable& map, |
| std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList, |
| std::chrono::milliseconds timeout, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| // Multiple hop implementation |
| LOGDEBUG("ThinClientRegion::multiHopPutAllNoThrow_remote "); |
| auto err = GF_NOERR; |
| |
| // Construct request/reply for putAll |
| TcrMessagePutAll request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, map, timeout, m_tcrdm.get(), |
| aCallbackArgument); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| request.setTimeout(timeout); |
| reply.setTimeout(timeout); |
| |
| std::recursive_mutex responseLock; |
| versionedObjPartList = |
| std::make_shared<VersionedCacheableObjectPartList>(this, responseLock); |
| // need to check |
| ChunkedPutAllResponse* resultCollector(new ChunkedPutAllResponse( |
| shared_from_this(), reply, responseLock, versionedObjPartList)); |
| reply.setChunkedResultHandler(resultCollector); |
| |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| |
| versionedObjPartList = resultCollector->getList(); |
| LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d ", |
| versionedObjPartList->size(), err); |
| delete resultCollector; |
| if (err != GF_NOERR) return err; |
| LOGDEBUG( |
| "ThinClientRegion::multiHopPutAllNoThrow_remote reply.getMessageType() = " |
| "%d ", |
| reply.getMessageType()); |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: |
| // LOGDEBUG("Map is written into remote server at region %s", |
| // m_fullPath.c_str()); |
| break; |
| case TcrMessage::RESPONSE: |
| LOGDEBUG( |
| "multiHopPutAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ", |
| m_fullPath.c_str(), err); |
| break; |
| case TcrMessage::EXCEPTION: |
| err = handleServerException("ThinClientRegion::putAllNoThrow", |
| reply.getException()); |
| // TODO:: Do we need to read PutAllPartialServerException for multiple |
| // hop. |
| break; |
| case TcrMessage::PUT_DATA_ERROR: |
| // LOGERROR( "A write error occurred on the endpoint %s", |
| // m_tcrdm->getActiveEndpoint( )->name( ).c_str( ) ); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| default: |
| LOGERROR("Unknown message type %d during region put-all", |
| reply.getMessageType()); |
| err = GF_NOTOBJ; |
| break; |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::putAllNoThrow_remote( |
| const HashMapOfCacheable& map, |
| std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList, |
| std::chrono::milliseconds timeout, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| LOGDEBUG("ThinClientRegion::putAllNoThrow_remote"); |
| |
| if (auto poolDM = std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) { |
| if (poolDM->getPRSingleHopEnabled() && poolDM->getClientMetaDataService() && |
| !TSSTXStateWrapper::get().getTXState()) { |
| return singleHopPutAllNoThrow_remote( |
| poolDM.get(), map, versionedObjPartList, timeout, aCallbackArgument); |
| } else { |
| return multiHopPutAllNoThrow_remote(map, versionedObjPartList, timeout, |
| aCallbackArgument); |
| } |
| } else { |
| LOGERROR("ThinClientRegion::putAllNoThrow_remote :: Pool Not Specified "); |
| return GF_NOTSUP; |
| } |
| } |
| |
| GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote( |
| ThinClientPoolDM* tcrdm, |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| LOGDEBUG( |
| " ThinClientRegion::singleHopRemoveAllNoThrow_remote keys size = %zu", |
| keys.size()); |
| auto region = shared_from_this(); |
| GfErrType error = GF_NOERR; |
| |
| auto locationMap = tcrdm->getClientMetaDataService()->getServerToFilterMap( |
| keys, region, true); |
| if (!locationMap) { |
| // removeAll with multiple hop implementation |
| LOGDEBUG("locationMap is Null or Empty"); |
| return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList, |
| aCallbackArgument); |
| } |
| |
| // set this flag that indicates putAll on PR is invoked with singlehop |
| // enabled. |
| m_isPRSingleHopEnabled = true; |
| LOGDEBUG("locationMap.size() = %d ", locationMap->size()); |
| |
| /*Step-2 |
| * a. create vector of RemoveAllWork |
| * b. locationMap<std::shared_ptr<BucketServerLocation>, |
| * std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>> >>. Create |
| * server specific filteredMap/subMap by populating all keys |
| * (locationIter.second()) and its corr. values from the user Map. |
| * c. create new instance of RemoveAllWork, i.e worker with required params. |
| * //TODO:: Add details of each parameter later |
| * d. enqueue the worker for thread from threadPool to perform/run execute |
| * method. |
| * e. insert the worker into the vector. |
| */ |
| std::vector<std::shared_ptr<RemoveAllWork>> removeAllWorkers; |
| auto& threadPool = m_cacheImpl->getThreadPool(); |
| int locationMapIndex = 0; |
| for (const auto& locationIter : *locationMap) { |
| const auto& serverLocation = locationIter.first; |
| if (serverLocation == nullptr) { |
| LOGDEBUG("serverLocation is nullptr"); |
| } |
| const auto& mappedkeys = locationIter.second; |
| auto worker = std::make_shared<RemoveAllWork>( |
| tcrdm, serverLocation, region, true /*attemptFailover*/, |
| false /*isBGThread*/, mappedkeys, aCallbackArgument); |
| threadPool.perform(worker); |
| removeAllWorkers.push_back(worker); |
| locationMapIndex++; |
| } |
| // TODO::CHECK, do we need to set following ..?? |
| // reply.setMessageType(TcrMessage::RESPONSE); |
| |
| int cnt = 1; |
| |
| /** |
| * Step::3 |
| * a. Iterate over all vector of putAllWorkers and populate worker specific |
| * information into the HashMap |
| * resultMap<std::shared_ptr<BucketServerLocation>, |
| * std::shared_ptr<Serializable>>, 2nd part, Value can be a |
| * std::shared_ptr<VersionedCacheableObjectPartList> or |
| * std::shared_ptr<PutAllPartialResultServerException>. |
| * failedServers<std::shared_ptr<BucketServerLocation>, |
| * std::shared_ptr<CacheableInt32>>, 2nd part, Value is a ErrorCode. b. delete |
| * the worker |
| */ |
| auto resultMap = ResultMap(); |
| auto failedServers = FailedServersMap(); |
| for (const auto& worker : removeAllWorkers) { |
| auto err = |
| worker->getResult(); // wait() or blocking call for worker thread. |
| LOGDEBUG("Error code :: %s:%d err = %d ", __FILE__, __LINE__, err); |
| |
| if (GF_NOERR == err) { |
| // No Exception from server |
| resultMap.emplace(worker->getServerLocation(), |
| worker->getResultCollector()->getList()); |
| } else { |
| error = err; |
| |
| if (error == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { |
| resultMap.emplace(worker->getServerLocation(), |
| worker->getPaPResultException()); |
| } else if (error == GF_NOTCON) { |
| // Refresh the metadata in case of GF_NOTCON. |
| tcrdm->getClientMetaDataService()->enqueueForMetadataRefresh( |
| region->getFullPath(), 0); |
| } |
| failedServers.emplace(worker->getServerLocation(), |
| CacheableInt32::create(error)); |
| } |
| |
| LOGDEBUG( |
| "worker->getResultCollector()->getList()->getVersionedTagsize() = %d ", |
| worker->getResultCollector()->getList()->getVersionedTagsize()); |
| |
| cnt++; |
| } |
| /** |
| * Step:4 |
| * a. create instance of std::shared_ptr<PutAllPartialResult> with total size= |
| * map.size() b. Iterate over the resultMap and value for the particular |
| * serverlocation is of type VersionedCacheableObjectPartList add keys and |
| * versions. C. ToDO:: what if the value in the resultMap is of type |
| * PutAllPartialResultServerException |
| */ |
| std::recursive_mutex responseLock; |
| auto result = std::make_shared<PutAllPartialResult>( |
| static_cast<int>(keys.size()), responseLock); |
| LOGDEBUG( |
| " TCRegion:: %s:%d " |
| "result->getSucceededKeysAndVersions()->getVersionedTagsize() = %d ", |
| __FILE__, __LINE__, |
| result->getSucceededKeysAndVersions()->getVersionedTagsize()); |
| LOGDEBUG(" TCRegion:: %s:%d resultMap->size() ", __FILE__, __LINE__, |
| resultMap.size()); |
| for (const auto& resultMapIter : resultMap) { |
| const auto& value = resultMapIter.second; |
| |
| if (const auto papException = |
| std::dynamic_pointer_cast<PutAllPartialResultServerException>( |
| value)) { |
| // PutAllPartialResultServerException CASE:: value in resultMap is of type |
| // PutAllPartialResultServerException. |
| // TODO:: Add failedservers.keySet= all fialed servers, i.e list out all |
| // keys in map failedServers, |
| // that is set view of the keys contained in failedservers map. |
| // TODO:: need to read papException and populate PutAllPartialResult. |
| result->consolidate(papException->getResult()); |
| } else if (const auto list = |
| std::dynamic_pointer_cast<VersionedCacheableObjectPartList>( |
| value)) { |
| // value in resultMap is of type VCOPL. |
| result->addKeysAndVersions(list); |
| } else { |
| // ERROR CASE |
| if (value) { |
| LOGERROR( |
| "ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value " |
| "could not Cast to either VCOPL or " |
| "PutAllPartialResultServerException:%s", |
| value->toString().c_str()); |
| } else { |
| LOGERROR( |
| "ERROR:: ThinClientRegion::singleHopRemoveAllNoThrow_remote value " |
| "is nullptr"); |
| } |
| } |
| } |
| |
| /** |
| * a. if PutAllPartialResult result does not contains any entry, Iterate over |
| * locationMap. |
| * b. Create std::vector<std::shared_ptr<CacheableKey>> succeedKeySet, and |
| * keep adding set of keys (locationIter.second()) in locationMap for which |
| * failedServers->contains(locationIter.first()is false. |
| */ |
| |
| LOGDEBUG("ThinClientRegion:: %s:%d failedServers->size() = %zu", __FILE__, |
| __LINE__, failedServers.size()); |
| |
| // if the partial result set doesn't already have keys (for tracking version |
| // tags) |
| // then we need to gather up the keys that we know have succeeded so far and |
| // add them to the partial result set (See bug Id #955) |
| if (!failedServers.empty()) { |
| auto succeedKeySet = |
| std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>(); |
| if (result->getSucceededKeysAndVersions()->size() == 0) { |
| for (const auto& locationIter : *locationMap) { |
| if (failedServers.find(locationIter.first) != failedServers.end()) { |
| for (const auto& i : *(locationIter.second)) { |
| succeedKeySet->push_back(i); |
| } |
| } |
| } |
| result->addKeys(succeedKeySet); |
| } |
| } |
| |
| /** |
| * a. Iterate over the failedServers map |
| * c. if failedServer map contains "GF_PUTALL_PARTIAL_RESULT_EXCEPTION" then |
| * continue, Do not retry putAll for corr. keys. |
| * b. Retry for all the failed server. |
| * Generate a newSubMap by finding Keys specific to failedServers from |
| * locationMap and finding their respective values from the usermap. |
| */ |
| error = GF_NOERR; |
| bool oneSubMapRetryFailed = false; |
| for (const auto& failedServerIter : failedServers) { |
| if (failedServerIter.second->value() == |
| GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { // serverLocation |
| // will not retry for PutAllPartialResultException |
| // but it means at least one sub map ever failed |
| oneSubMapRetryFailed = true; |
| error = GF_PUTALL_PARTIAL_RESULT_EXCEPTION; |
| continue; |
| } |
| |
| std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> failedKeys = |
| nullptr; |
| const auto& failedSerInLocMapIter = |
| locationMap->find(failedServerIter.first); |
| if (failedSerInLocMapIter != locationMap->end()) { |
| failedKeys = failedSerInLocMapIter->second; |
| } |
| |
| if (failedKeys == nullptr) { |
| LOGERROR( |
| "TCRegion::singleHopRemoveAllNoThrow_remote :: failedKeys are " |
| "nullptr " |
| "that is not valid"); |
| } |
| |
| std::shared_ptr<VersionedCacheableObjectPartList> vcopListPtr; |
| std::shared_ptr<PutAllPartialResultServerException> papResultServerExc = |
| nullptr; |
| GfErrType errCode = multiHopRemoveAllNoThrow_remote( |
| *failedKeys, vcopListPtr, aCallbackArgument); |
| if (errCode == GF_NOERR) { |
| result->addKeysAndVersions(vcopListPtr); |
| } else if (errCode == GF_PUTALL_PARTIAL_RESULT_EXCEPTION) { |
| oneSubMapRetryFailed = true; |
| error = errCode; |
| } else /*if(errCode != GF_NOERR)*/ { |
| oneSubMapRetryFailed = true; |
| std::shared_ptr<Exception> excptPtr = nullptr; |
| result->saveFailedKey(failedKeys->at(0), excptPtr); |
| error = errCode; |
| } |
| } |
| |
| if (!oneSubMapRetryFailed) { |
| error = GF_NOERR; |
| } |
| versionedObjPartList = result->getSucceededKeysAndVersions(); |
| LOGDEBUG("singlehop versionedObjPartList = %d error=%d", |
| versionedObjPartList->size(), error); |
| return error; |
| } |
| |
| GfErrType ThinClientRegion::multiHopRemoveAllNoThrow_remote( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| // Multiple hop implementation |
| LOGDEBUG("ThinClientRegion::multiHopRemoveAllNoThrow_remote "); |
| GfErrType err = GF_NOERR; |
| |
| // Construct request/reply for putAll |
| TcrMessageRemoveAll request(new DataOutput(m_cacheImpl->createDataOutput()), |
| this, keys, aCallbackArgument, m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| |
| std::recursive_mutex responseLock; |
| versionedObjPartList = |
| std::make_shared<VersionedCacheableObjectPartList>(this, responseLock); |
| // need to check |
| ChunkedRemoveAllResponse* resultCollector(new ChunkedRemoveAllResponse( |
| shared_from_this(), reply, responseLock, versionedObjPartList)); |
| reply.setChunkedResultHandler(resultCollector); |
| |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| |
| versionedObjPartList = resultCollector->getList(); |
| LOGDEBUG("multiple hop versionedObjPartList size = %d , err = %d ", |
| versionedObjPartList->size(), err); |
| delete resultCollector; |
| if (err != GF_NOERR) return err; |
| LOGDEBUG( |
| "ThinClientRegion::multiHopRemoveAllNoThrow_remote " |
| "reply.getMessageType() = %d ", |
| reply.getMessageType()); |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: |
| // LOGDEBUG("Map is written into remote server at region %s", |
| // m_fullPath.c_str()); |
| break; |
| case TcrMessage::RESPONSE: |
| LOGDEBUG( |
| "multiHopRemoveAllNoThrow_remote TcrMessage::RESPONSE %s, err = %d ", |
| m_fullPath.c_str(), err); |
| break; |
| case TcrMessage::EXCEPTION: |
| err = handleServerException("ThinClientRegion::putAllNoThrow", |
| reply.getException()); |
| break; |
| default: |
| LOGERROR("Unknown message type %d during region remove-all", |
| reply.getMessageType()); |
| err = GF_NOTOBJ; |
| break; |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::removeAllNoThrow_remote( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| std::shared_ptr<VersionedCacheableObjectPartList>& versionedObjPartList, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| LOGDEBUG("ThinClientRegion::removeAllNoThrow_remote"); |
| |
| if (auto poolDM = std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) { |
| if (poolDM->getPRSingleHopEnabled() && poolDM->getClientMetaDataService() && |
| !TSSTXStateWrapper::get().getTXState()) { |
| return singleHopRemoveAllNoThrow_remote( |
| poolDM.get(), keys, versionedObjPartList, aCallbackArgument); |
| } else { |
| return multiHopRemoveAllNoThrow_remote(keys, versionedObjPartList, |
| aCallbackArgument); |
| } |
| } else { |
| LOGERROR( |
| "ThinClientRegion::removeAllNoThrow_remote :: Pool Not Specified "); |
| return GF_NOTSUP; |
| } |
| } |
| |
| uint32_t ThinClientRegion::size_remote() { |
| LOGDEBUG("ThinClientRegion::size_remote"); |
| GfErrType err = GF_NOERR; |
| |
| // do TCR size |
| TcrMessageSize request(new DataOutput(m_cacheImpl->createDataOutput()), |
| m_fullPath.c_str()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| |
| if (err != GF_NOERR) { |
| throwExceptionIfError("Region::size", err); |
| } |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| auto size = std::dynamic_pointer_cast<CacheableInt32>(reply.getValue()); |
| return size->value(); |
| } |
| case TcrMessage::EXCEPTION: |
| err = |
| handleServerException("ThinClientRegion::size", reply.getException()); |
| break; |
| case TcrMessage::SIZE_ERROR: |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| default: |
| LOGERROR("Unknown message type %d during region size", |
| reply.getMessageType()); |
| err = GF_NOTOBJ; |
| } |
| |
| throwExceptionIfError("Region::size", err); |
| return 0; |
| } |
| |
| GfErrType ThinClientRegion::registerStoredRegex( |
| TcrEndpoint* endpoint, |
| std::unordered_map<std::string, InterestResultPolicy>& interestListRegex, |
| bool isDurable, bool receiveValues) { |
| GfErrType opErr = GF_NOERR; |
| GfErrType retVal = GF_NOERR; |
| |
| for (std::unordered_map<std::string, InterestResultPolicy>::iterator it = |
| interestListRegex.begin(); |
| it != interestListRegex.end(); ++it) { |
| opErr = registerRegexNoThrow(it->first, false, endpoint, isDurable, nullptr, |
| it->second, receiveValues); |
| if (opErr != GF_NOERR) { |
| retVal = opErr; |
| } |
| } |
| |
| return retVal; |
| } |
| |
| GfErrType ThinClientRegion::registerKeys(TcrEndpoint* endpoint, |
| const TcrMessage* request, |
| TcrMessageReply* reply) { |
| GfErrType err = GF_NOERR; |
| GfErrType opErr = GF_NOERR; |
| |
| // called when failover to a different server |
| opErr = registerStoredRegex(endpoint, m_interestListRegex); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = registerStoredRegex( |
| endpoint, m_interestListRegexForUpdatesAsInvalidates, false, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = registerStoredRegex(endpoint, m_durableInterestListRegex, true); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = registerStoredRegex( |
| endpoint, m_durableInterestListRegexForUpdatesAsInvalidates, true, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVec; |
| InterestResultPolicy interestPolicy = |
| copyInterestList(keysVec, m_interestList); |
| opErr = registerKeysNoThrow(keysVec, false, endpoint, false, interestPolicy); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates; |
| interestPolicy = copyInterestList(keysVecForUpdatesAsInvalidates, |
| m_interestListForUpdatesAsInvalidates); |
| opErr = registerKeysNoThrow(keysVecForUpdatesAsInvalidates, false, endpoint, |
| false, interestPolicy, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVecDurable; |
| interestPolicy = copyInterestList(keysVecDurable, m_durableInterestList); |
| opErr = registerKeysNoThrow(keysVecDurable, false, endpoint, true, |
| interestPolicy); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> |
| keysVecDurableForUpdatesAsInvalidates; |
| interestPolicy = |
| copyInterestList(keysVecDurableForUpdatesAsInvalidates, |
| m_durableInterestListForUpdatesAsInvalidates); |
| opErr = registerKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false, |
| endpoint, true, interestPolicy, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| if (request != nullptr && request->getRegionName() == m_fullPath && |
| (request->getMessageType() == TcrMessage::REGISTER_INTEREST || |
| request->getMessageType() == TcrMessage::REGISTER_INTEREST_LIST)) { |
| const std::vector<std::shared_ptr<CacheableKey>>* newKeysVec = |
| request->getKeys(); |
| bool isDurable = request->isDurable(); |
| bool receiveValues = request->receiveValues(); |
| if (newKeysVec == nullptr || newKeysVec->empty()) { |
| const std::string& newRegex = request->getRegex(); |
| if (!newRegex.empty()) { |
| if (request->getRegionName() != m_fullPath) { |
| reply = nullptr; |
| } |
| opErr = registerRegexNoThrow( |
| newRegex, false, endpoint, isDurable, nullptr, |
| request->getInterestResultPolicy(), receiveValues, reply); |
| err = opErr != GF_NOERR ? opErr : err; |
| } |
| } else { |
| opErr = registerKeysNoThrow(*newKeysVec, false, endpoint, isDurable, |
| request->getInterestResultPolicy(), |
| receiveValues, reply); |
| err = opErr != GF_NOERR ? opErr : err; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::unregisterStoredRegex( |
| std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) { |
| GfErrType opErr = GF_NOERR; |
| GfErrType retVal = GF_NOERR; |
| |
| for (std::unordered_map<std::string, InterestResultPolicy>::iterator it = |
| interestListRegex.begin(); |
| it != interestListRegex.end(); ++it) { |
| opErr = unregisterRegexNoThrow(it->first, false); |
| if (opErr != GF_NOERR) { |
| retVal = opErr; |
| } |
| } |
| |
| return retVal; |
| } |
| |
| GfErrType ThinClientRegion::unregisterStoredRegexLocalDestroy( |
| std::unordered_map<std::string, InterestResultPolicy>& interestListRegex) { |
| GfErrType opErr = GF_NOERR; |
| GfErrType retVal = GF_NOERR; |
| |
| for (std::unordered_map<std::string, InterestResultPolicy>::iterator it = |
| interestListRegex.begin(); |
| it != interestListRegex.end(); ++it) { |
| opErr = unregisterRegexNoThrowLocalDestroy(it->first, false); |
| if (opErr != GF_NOERR) { |
| retVal = opErr; |
| } |
| } |
| return retVal; |
| } |
| |
| GfErrType ThinClientRegion::unregisterKeys() { |
| GfErrType err = GF_NOERR; |
| GfErrType opErr = GF_NOERR; |
| |
| // called when disconnect from a server |
| opErr = unregisterStoredRegex(m_interestListRegex); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = unregisterStoredRegex(m_durableInterestListRegex); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = unregisterStoredRegex(m_interestListRegexForUpdatesAsInvalidates); |
| err = opErr != GF_NOERR ? opErr : err; |
| opErr = |
| unregisterStoredRegex(m_durableInterestListRegexForUpdatesAsInvalidates); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVec; |
| copyInterestList(keysVec, m_interestList); |
| opErr = unregisterKeysNoThrow(keysVec, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVecDurable; |
| copyInterestList(keysVecDurable, m_durableInterestList); |
| opErr = unregisterKeysNoThrow(keysVecDurable, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVecForUpdatesAsInvalidates; |
| copyInterestList(keysVecForUpdatesAsInvalidates, |
| m_interestListForUpdatesAsInvalidates); |
| opErr = unregisterKeysNoThrow(keysVecForUpdatesAsInvalidates, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| std::vector<std::shared_ptr<CacheableKey>> |
| keysVecDurableForUpdatesAsInvalidates; |
| copyInterestList(keysVecDurableForUpdatesAsInvalidates, |
| m_durableInterestListForUpdatesAsInvalidates); |
| opErr = unregisterKeysNoThrow(keysVecDurableForUpdatesAsInvalidates, false); |
| err = opErr != GF_NOERR ? opErr : err; |
| |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::destroyRegionNoThrow_remote( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = GF_NOERR; |
| |
| // do TCR destroyRegion |
| TcrMessageDestroyRegion request( |
| new DataOutput(m_cacheImpl->createDataOutput()), this, aCallbackArgument, |
| std::chrono::milliseconds(-1), m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) return err; |
| |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: { |
| // LOGINFO("Region %s at remote is destroyed successfully", |
| // m_fullPath.c_str()); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = |
| handleServerException("Region::destroyRegion", reply.getException()); |
| break; |
| } |
| case TcrMessage::DESTROY_REGION_DATA_ERROR: { |
| err = GF_CACHE_REGION_DESTROYED_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d during destroy region", |
| reply.getMessageType()); |
| err = GF_MSG; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::registerKeysNoThrow( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| bool attemptFailover, TcrEndpoint* endpoint, bool isDurable, |
| InterestResultPolicy interestPolicy, bool receiveValues, |
| TcrMessageReply* reply) { |
| RegionGlobalLocks acquireLocksRedundancy(this, false); |
| RegionGlobalLocks acquireLocksFailover(this); |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| |
| std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); |
| if (keys.empty()) { |
| return err; |
| } |
| |
| TcrMessageReply replyLocal(true, m_tcrdm.get()); |
| bool needToCreateRC = true; |
| if (reply == nullptr) { |
| reply = &replyLocal; |
| } else { |
| needToCreateRC = false; |
| } |
| |
| LOGDEBUG("ThinClientRegion::registerKeysNoThrow : interestpolicy is %d", |
| interestPolicy.ordinal); |
| |
| TcrMessageRegisterInterestList request( |
| new DataOutput(m_cacheImpl->createDataOutput()), this, keys, isDurable, |
| getAttributes().getCachingEnabled(), receiveValues, interestPolicy, |
| m_tcrdm.get()); |
| std::recursive_mutex responseLock; |
| TcrChunkedResult* resultCollector = nullptr; |
| if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) { |
| auto values = std::make_shared<HashMapOfCacheable>(); |
| auto exceptions = std::make_shared<HashMapOfException>(); |
| MapOfUpdateCounters trackers; |
| int32_t destroyTracker = 1; |
| if (needToCreateRC) { |
| resultCollector = (new ChunkedGetAllResponse( |
| request, this, &keys, values, exceptions, nullptr, trackers, |
| destroyTracker, true, responseLock)); |
| reply->setChunkedResultHandler(resultCollector); |
| } |
| } else { |
| if (needToCreateRC) { |
| resultCollector = (new ChunkedInterestResponse(request, nullptr, *reply)); |
| reply->setChunkedResultHandler(resultCollector); |
| } |
| } |
| |
| err = m_tcrdm->sendSyncRequestRegisterInterest( |
| request, *reply, attemptFailover, this, endpoint); |
| |
| if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { |
| if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY && |
| endpoint) { |
| LOGFINER( |
| "registerKeysNoThrow - got response from secondary for " |
| "endpoint %s, ignoring.", |
| endpoint->name().c_str()); |
| } else if (attemptFailover) { |
| addKeys(keys, isDurable, receiveValues, interestPolicy); |
| if (!(interestPolicy.ordinal == |
| InterestResultPolicy::KEYS_VALUES.ordinal)) { |
| localInvalidateForRegisterInterest(keys); |
| } |
| } |
| } |
| if (needToCreateRC) { |
| delete resultCollector; |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::unregisterKeysNoThrow( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| bool attemptFailover) { |
| RegionGlobalLocks acquireLocksRedundancy(this, false); |
| RegionGlobalLocks acquireLocksFailover(this); |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| if (keys.empty()) { |
| return err; |
| } |
| |
| if (m_interestList.empty() && m_durableInterestList.empty() && |
| m_interestListForUpdatesAsInvalidates.empty() && |
| m_durableInterestListForUpdatesAsInvalidates.empty()) { |
| // did not register any keys before. |
| return GF_CACHE_ILLEGAL_STATE_EXCEPTION; |
| } |
| |
| TcrMessageUnregisterInterestList request( |
| new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true, |
| InterestResultPolicy::NONE, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); |
| if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { |
| if (attemptFailover) { |
| for (const auto& key : keys) { |
| m_interestList.erase(key); |
| m_durableInterestList.erase(key); |
| m_interestListForUpdatesAsInvalidates.erase(key); |
| m_durableInterestListForUpdatesAsInvalidates.erase(key); |
| } |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| bool attemptFailover) { |
| RegionGlobalLocks acquireLocksRedundancy(this, false); |
| RegionGlobalLocks acquireLocksFailover(this); |
| GfErrType err = GF_NOERR; |
| std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| if (keys.empty()) { |
| return err; |
| } |
| |
| if (m_interestList.empty() && m_durableInterestList.empty() && |
| m_interestListForUpdatesAsInvalidates.empty() && |
| m_durableInterestListForUpdatesAsInvalidates.empty()) { |
| // did not register any keys before. |
| return GF_CACHE_ILLEGAL_STATE_EXCEPTION; |
| } |
| |
| TcrMessageUnregisterInterestList request( |
| new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true, |
| InterestResultPolicy::NONE, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); |
| if (err == GF_NOERR) { |
| if (attemptFailover) { |
| for (const auto& key : keys) { |
| m_interestList.erase(key); |
| m_durableInterestList.erase(key); |
| m_interestListForUpdatesAsInvalidates.erase(key); |
| m_durableInterestListForUpdatesAsInvalidates.erase(key); |
| } |
| } |
| } |
| return err; |
| } |
| |
| bool ThinClientRegion::isRegexRegistered( |
| std::unordered_map<std::string, InterestResultPolicy>& interestListRegex, |
| const std::string& regex, bool allKeys) { |
| if (interestListRegex.find(".*") != interestListRegex.end() || |
| (!allKeys && interestListRegex.find(regex) != interestListRegex.end())) { |
| return true; |
| } |
| return false; |
| } |
| |
| GfErrType ThinClientRegion::registerRegexNoThrow( |
| const std::string& regex, bool attemptFailover, TcrEndpoint* endpoint, |
| bool isDurable, |
| std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> resultKeys, |
| InterestResultPolicy interestPolicy, bool receiveValues, |
| TcrMessageReply* reply) { |
| RegionGlobalLocks acquireLocksRedundancy(this, false); |
| RegionGlobalLocks acquireLocksFailover(this); |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| |
| bool allKeys = (regex == ".*"); |
| std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); |
| |
| if (attemptFailover) { |
| if ((isDurable && |
| (isRegexRegistered(m_durableInterestListRegex, regex, allKeys) || |
| isRegexRegistered(m_durableInterestListRegexForUpdatesAsInvalidates, |
| regex, allKeys))) || |
| (!isDurable && |
| (isRegexRegistered(m_interestListRegex, regex, allKeys) || |
| isRegexRegistered(m_interestListRegexForUpdatesAsInvalidates, regex, |
| allKeys)))) { |
| return err; |
| } |
| } |
| |
| ChunkedInterestResponse* resultCollector = nullptr; |
| ChunkedGetAllResponse* getAllResultCollector = nullptr; |
| if (reply != nullptr) { |
| // need to check |
| resultCollector = dynamic_cast<ChunkedInterestResponse*>( |
| reply->getChunkedResultHandler()); |
| if (resultCollector != nullptr) { |
| resultKeys = resultCollector->getResultKeys(); |
| } else { |
| getAllResultCollector = dynamic_cast<ChunkedGetAllResponse*>( |
| reply->getChunkedResultHandler()); |
| resultKeys = getAllResultCollector->getResultKeys(); |
| } |
| } |
| |
| bool isRCCreatedLocally = false; |
| LOGDEBUG("ThinClientRegion::registerRegexNoThrow : interestpolicy is %d", |
| interestPolicy.ordinal); |
| |
| // TODO: |
| TcrMessageRegisterInterest request( |
| new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, |
| regex.c_str(), interestPolicy, isDurable, |
| getAttributes().getCachingEnabled(), receiveValues, m_tcrdm.get()); |
| std::recursive_mutex responseLock; |
| if (reply == nullptr) { |
| TcrMessageReply replyLocal(true, m_tcrdm.get()); |
| auto values = std::make_shared<HashMapOfCacheable>(); |
| auto exceptions = std::make_shared<HashMapOfException>(); |
| |
| reply = &replyLocal; |
| if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) { |
| MapOfUpdateCounters trackers; |
| int32_t destroyTracker = 1; |
| if (resultKeys == nullptr) { |
| resultKeys = |
| std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>( |
| new std::vector<std::shared_ptr<CacheableKey>>()); |
| } |
| // need to check |
| getAllResultCollector = (new ChunkedGetAllResponse( |
| request, this, nullptr, values, exceptions, resultKeys, trackers, |
| destroyTracker, true, responseLock)); |
| reply->setChunkedResultHandler(getAllResultCollector); |
| isRCCreatedLocally = true; |
| } else { |
| isRCCreatedLocally = true; |
| // need to check |
| resultCollector = |
| new ChunkedInterestResponse(request, resultKeys, replyLocal); |
| reply->setChunkedResultHandler(resultCollector); |
| } |
| err = m_tcrdm->sendSyncRequestRegisterInterest( |
| request, replyLocal, attemptFailover, this, endpoint); |
| } else { |
| err = m_tcrdm->sendSyncRequestRegisterInterest( |
| request, *reply, attemptFailover, this, endpoint); |
| } |
| if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { |
| if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY && |
| endpoint) { |
| LOGFINER( |
| "registerRegexNoThrow - got response from secondary for " |
| "endpoint %s, ignoring.", |
| endpoint->name().c_str()); |
| } else if (attemptFailover) { |
| addRegex(regex, isDurable, receiveValues, interestPolicy); |
| if (interestPolicy.ordinal != InterestResultPolicy::KEYS_VALUES.ordinal) { |
| if (allKeys) { |
| localInvalidateRegion_internal(); |
| } else { |
| const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>& |
| keys = resultCollector != nullptr |
| ? resultCollector->getResultKeys() |
| : getAllResultCollector->getResultKeys(); |
| if (keys != nullptr) { |
| localInvalidateForRegisterInterest(*keys); |
| } |
| } |
| } |
| } |
| } |
| |
| if (isRCCreatedLocally == true) { |
| if (resultCollector != nullptr) delete resultCollector; |
| if (getAllResultCollector != nullptr) delete getAllResultCollector; |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::unregisterRegexNoThrow(const std::string& regex, |
| bool attemptFailover) { |
| RegionGlobalLocks acquireLocksRedundancy(this, false); |
| RegionGlobalLocks acquireLocksFailover(this); |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| |
| err = findRegex(regex); |
| |
| if (err == GF_NOERR) { |
| TcrMessageReply reply(false, m_tcrdm.get()); |
| TcrMessageUnregisterInterest request( |
| new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex, |
| InterestResultPolicy::NONE, false, true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); |
| if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { |
| if (attemptFailover) { |
| clearRegex(regex); |
| } |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::findRegex(const std::string& regex) { |
| GfErrType err = GF_NOERR; |
| std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); |
| |
| if (m_interestListRegex.find(regex) == m_interestListRegex.end() && |
| m_durableInterestListRegex.find(regex) == |
| m_durableInterestListRegex.end() && |
| m_interestListRegexForUpdatesAsInvalidates.find(regex) == |
| m_interestListRegexForUpdatesAsInvalidates.end() && |
| m_durableInterestListRegexForUpdatesAsInvalidates.find(regex) == |
| m_durableInterestListRegexForUpdatesAsInvalidates.end()) { |
| return GF_CACHE_ILLEGAL_STATE_EXCEPTION; |
| } else { |
| return err; |
| } |
| } |
| |
| void ThinClientRegion::clearRegex(const std::string& regex) { |
| std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); |
| m_interestListRegex.erase(regex); |
| m_durableInterestListRegex.erase(regex); |
| m_interestListRegexForUpdatesAsInvalidates.erase(regex); |
| m_durableInterestListRegexForUpdatesAsInvalidates.erase(regex); |
| } |
| |
| GfErrType ThinClientRegion::unregisterRegexNoThrowLocalDestroy( |
| const std::string& regex, bool attemptFailover) { |
| GfErrType err = GF_NOERR; |
| |
| err = findRegex(regex); |
| |
| if (err == GF_NOERR) { |
| TcrMessageReply reply(false, m_tcrdm.get()); |
| TcrMessageUnregisterInterest request( |
| new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex, |
| InterestResultPolicy::NONE, false, true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); |
| if (err == GF_NOERR) { |
| if (attemptFailover) { |
| clearRegex(regex); |
| } |
| } |
| } |
| return err; |
| } |
| |
| void ThinClientRegion::addKeys( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable, |
| bool receiveValues, InterestResultPolicy interestpolicy) { |
| std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>& |
| interestList = |
| isDurable |
| ? (receiveValues ? m_durableInterestList |
| : m_durableInterestListForUpdatesAsInvalidates) |
| : (receiveValues ? m_interestList |
| : m_interestListForUpdatesAsInvalidates); |
| |
| for (const auto& key : keys) { |
| interestList.insert( |
| std::pair<std::shared_ptr<CacheableKey>, InterestResultPolicy>( |
| key, interestpolicy)); |
| } |
| } |
| |
| void ThinClientRegion::addRegex(const std::string& regex, bool isDurable, |
| bool receiveValues, |
| InterestResultPolicy interestpolicy) { |
| std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>& |
| interestList = |
| isDurable |
| ? (receiveValues ? m_durableInterestList |
| : m_durableInterestListForUpdatesAsInvalidates) |
| : (receiveValues ? m_interestList |
| : m_interestListForUpdatesAsInvalidates); |
| |
| std::unordered_map<std::string, InterestResultPolicy>& interestListRegex = |
| isDurable |
| ? (receiveValues ? m_durableInterestListRegex |
| : m_durableInterestListRegexForUpdatesAsInvalidates) |
| : (receiveValues ? m_interestListRegex |
| : m_interestListRegexForUpdatesAsInvalidates); |
| |
| if (regex == ".*") { |
| interestListRegex.clear(); |
| interestList.clear(); |
| } |
| |
| interestListRegex.insert( |
| std::pair<std::string, InterestResultPolicy>(regex, interestpolicy)); |
| } |
| |
| std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::getInterestList() |
| const { |
| auto nthis = const_cast<ThinClientRegion*>(this); |
| RegionGlobalLocks acquireLocksRedundancy(nthis, false); |
| RegionGlobalLocks acquireLocksFailover(nthis); |
| CHECK_DESTROY_PENDING(TryReadGuard, getInterestList); |
| std::lock_guard<decltype(m_keysLock)> keysGuard(nthis->m_keysLock); |
| |
| std::vector<std::shared_ptr<CacheableKey>> vlist; |
| |
| std::transform(std::begin(m_durableInterestList), |
| std::end(m_durableInterestList), std::back_inserter(vlist), |
| [](const decltype(m_durableInterestList)::value_type& e) { |
| return e.first; |
| }); |
| |
| std::transform( |
| std::begin(m_interestList), std::end(m_interestList), |
| std::back_inserter(vlist), |
| [](const decltype(m_interestList)::value_type& e) { return e.first; }); |
| |
| return vlist; |
| } |
| std::vector<std::shared_ptr<CacheableString>> |
| ThinClientRegion::getInterestListRegex() const { |
| auto nthis = const_cast<ThinClientRegion*>(this); |
| RegionGlobalLocks acquireLocksRedundancy(nthis, false); |
| RegionGlobalLocks acquireLocksFailover(nthis); |
| CHECK_DESTROY_PENDING(TryReadGuard, getInterestListRegex); |
| std::lock_guard<decltype(m_keysLock)> keysGuard(nthis->m_keysLock); |
| |
| std::vector<std::shared_ptr<CacheableString>> vlist; |
| |
| std::transform(std::begin(m_durableInterestListRegex), |
| std::end(m_durableInterestListRegex), |
| std::back_inserter(vlist), |
| [](const decltype(m_durableInterestListRegex)::value_type& e) { |
| return CacheableString::create(e.first.c_str()); |
| }); |
| |
| std::transform(std::begin(m_interestListRegex), std::end(m_interestListRegex), |
| std::back_inserter(vlist), |
| [](const decltype(m_interestListRegex)::value_type& e) { |
| return CacheableString::create(e.first.c_str()); |
| }); |
| |
| return vlist; |
| } |
| |
| GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) { |
| GfErrType err = GF_NOERR; |
| std::shared_ptr<Cacheable> oldValue; |
| switch (msg.getMessageType()) { |
| case TcrMessage::LOCAL_INVALIDATE: { |
| LocalRegion::invalidateNoThrow( |
| msg.getKey(), msg.getCallbackArgument(), -1, |
| CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL, |
| msg.getVersionTag()); |
| break; |
| } |
| case TcrMessage::LOCAL_DESTROY: { |
| err = LocalRegion::destroyNoThrow( |
| msg.getKey(), msg.getCallbackArgument(), -1, |
| CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL, |
| msg.getVersionTag()); |
| break; |
| } |
| case TcrMessage::CLEAR_REGION: { |
| LOGDEBUG("remote clear region event for reigon[%s]", |
| msg.getRegionName().c_str()); |
| err = localClearNoThrow( |
| nullptr, CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL); |
| break; |
| } |
| case TcrMessage::LOCAL_DESTROY_REGION: { |
| m_notifyRelease = true; |
| err = LocalRegion::destroyRegionNoThrow( |
| msg.getCallbackArgument(), true, |
| CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL); |
| break; |
| } |
| case TcrMessage::LOCAL_CREATE: |
| err = LocalRegion::putNoThrow( |
| msg.getKey(), msg.getValue(), msg.getCallbackArgument(), oldValue, -1, |
| CacheEventFlags::NOTIFICATION | CacheEventFlags::LOCAL, |
| msg.getVersionTag()); |
| break; |
| case TcrMessage::LOCAL_UPDATE: { |
| // for update set the NOTIFICATION_UPDATE to trigger the |
| // afterUpdate event even if the key is not present in local cache |
| err = LocalRegion::putNoThrow( |
| msg.getKey(), msg.getValue(), msg.getCallbackArgument(), oldValue, -1, |
| CacheEventFlags::NOTIFICATION | CacheEventFlags::NOTIFICATION_UPDATE | |
| CacheEventFlags::LOCAL, |
| msg.getVersionTag(), msg.getDelta(), msg.getEventId()); |
| break; |
| } |
| case TcrMessage::TOMBSTONE_OPERATION: |
| LocalRegion::tombstoneOperationNoThrow(msg.getTombstoneVersions(), |
| msg.getTombstoneKeys()); |
| break; |
| default: { |
| if (TcrMessage::getAllEPDisMess() == &msg) { |
| setProcessedMarker(false); |
| LocalRegion::invokeAfterAllEndPointDisconnected(); |
| } else { |
| LOGERROR( |
| "Unknown message type %d in subscription event handler; possible " |
| "serialization mismatch", |
| msg.getMessageType()); |
| err = GF_MSG; |
| } |
| break; |
| } |
| } |
| |
| // Update EventIdMap to mark event processed, Only for durable client. |
| // In case of closing, don't send it as listener might not be invoked. |
| if (!m_destroyPending && (m_isDurableClnt || msg.hasDelta()) && |
| TcrMessage::getAllEPDisMess() != &msg) { |
| m_tcrdm->checkDupAndAdd(msg.getEventId()); |
| } |
| |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::handleServerException(const char* func, |
| const char* exceptionMsg) { |
| GfErrType error = GF_NOERR; |
| setThreadLocalExceptionMessage(exceptionMsg); |
| if (strstr(exceptionMsg, |
| "org.apache.geode.security.NotAuthorizedException") != nullptr) { |
| error = GF_NOT_AUTHORIZED_EXCEPTION; |
| } else if (strstr(exceptionMsg, |
| "org.apache.geode.cache.CacheWriterException") != nullptr) { |
| error = GF_CACHE_WRITER_EXCEPTION; |
| } else if (strstr( |
| exceptionMsg, |
| "org.apache.geode.security.AuthenticationFailedException") != |
| nullptr) { |
| error = GF_AUTHENTICATION_FAILED_EXCEPTION; |
| } else if (strstr(exceptionMsg, |
| "org.apache.geode.internal.cache.execute." |
| "InternalFunctionInvocationTargetException") != nullptr) { |
| error = GF_FUNCTION_EXCEPTION; |
| } else if (strstr(exceptionMsg, |
| "org.apache.geode.cache.CommitConflictException") != |
| nullptr) { |
| error = GF_COMMIT_CONFLICT_EXCEPTION; |
| } else if (strstr(exceptionMsg, |
| "org.apache.geode.cache." |
| "TransactionDataNodeHasDepartedException") != nullptr) { |
| error = GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION; |
| } else if (strstr( |
| exceptionMsg, |
| "org.apache.geode.cache.TransactionDataRebalancedException") != |
| nullptr) { |
| error = GF_TRANSACTION_DATA_REBALANCED_EXCEPTION; |
| } else if (strstr( |
| exceptionMsg, |
| "org.apache.geode.security.AuthenticationRequiredException") != |
| nullptr) { |
| error = GF_AUTHENTICATION_REQUIRED_EXCEPTION; |
| } else { |
| error = GF_CACHESERVER_EXCEPTION; |
| } |
| |
| if (error != GF_AUTHENTICATION_REQUIRED_EXCEPTION) { |
| LOGERROR("%s: An exception (%s) happened at remote server.", func, |
| exceptionMsg); |
| } else { |
| LOGFINER("%s: An exception (%s) happened at remote server.", func, |
| exceptionMsg); |
| } |
| return error; |
| } |
| |
| void ThinClientRegion::receiveNotification(TcrMessage* msg) { |
| std::unique_lock<std::mutex> lock(m_notificationMutex, std::defer_lock); |
| { |
| TryReadGuard guard(m_rwLock, m_destroyPending); |
| if (m_destroyPending) { |
| if (msg != TcrMessage::getAllEPDisMess()) { |
| _GEODE_SAFE_DELETE(msg); |
| } |
| return; |
| } |
| lock.lock(); |
| } |
| |
| if (msg->getMessageType() == TcrMessage::CLIENT_MARKER) { |
| handleMarker(); |
| } else { |
| clientNotificationHandler(*msg); |
| } |
| |
| lock.unlock(); |
| if (TcrMessage::getAllEPDisMess() != msg) _GEODE_SAFE_DELETE(msg); |
| } |
| |
| void ThinClientRegion::localInvalidateRegion_internal() { |
| std::shared_ptr<MapEntryImpl> me; |
| std::shared_ptr<Cacheable> oldValue; |
| |
| std::vector<std::shared_ptr<CacheableKey>> keysVec = keys_internal(); |
| for (const auto& key : keysVec) { |
| std::shared_ptr<VersionTag> versionTag; |
| m_entries->invalidate(key, me, oldValue, versionTag); |
| } |
| } |
| |
| void ThinClientRegion::invalidateInterestList( |
| std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>& |
| interestList) { |
| std::shared_ptr<MapEntryImpl> me; |
| std::shared_ptr<Cacheable> oldValue; |
| |
| if (!m_regionAttributes.getCachingEnabled()) { |
| return; |
| } |
| for (const auto& iter : interestList) { |
| std::shared_ptr<VersionTag> versionTag; |
| m_entries->invalidate(iter.first, me, oldValue, versionTag); |
| } |
| } |
| |
| void ThinClientRegion::localInvalidateFailover() { |
| CHECK_DESTROY_PENDING(TryReadGuard, |
| ThinClientRegion::localInvalidateFailover); |
| |
| // No need to invalidate from the "m_xxxForUpdatesAsInvalidates" lists? |
| if (m_interestListRegex.empty() && m_durableInterestListRegex.empty()) { |
| invalidateInterestList(m_interestList); |
| invalidateInterestList(m_durableInterestList); |
| } else { |
| localInvalidateRegion_internal(); |
| } |
| } |
| |
| void ThinClientRegion::localInvalidateForRegisterInterest( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys) { |
| CHECK_DESTROY_PENDING(TryReadGuard, |
| ThinClientRegion::localInvalidateForRegisterInterest); |
| |
| if (!m_regionAttributes.getCachingEnabled()) { |
| return; |
| } |
| |
| std::shared_ptr<Cacheable> oldValue; |
| std::shared_ptr<MapEntryImpl> me; |
| |
| for (const auto& key : keys) { |
| std::shared_ptr<VersionTag> versionTag; |
| m_entries->invalidate(key, me, oldValue, versionTag); |
| updateAccessAndModifiedTimeForEntry(me, true); |
| } |
| } |
| |
| InterestResultPolicy ThinClientRegion::copyInterestList( |
| std::vector<std::shared_ptr<CacheableKey>>& keysVector, |
| std::unordered_map<std::shared_ptr<CacheableKey>, InterestResultPolicy>& |
| interestList) const { |
| InterestResultPolicy interestPolicy = InterestResultPolicy::NONE; |
| for (std::unordered_map<std::shared_ptr<CacheableKey>, |
| InterestResultPolicy>::const_iterator iter = |
| interestList.begin(); |
| iter != interestList.end(); ++iter) { |
| keysVector.push_back(iter->first); |
| interestPolicy = iter->second; |
| } |
| return interestPolicy; |
| } |
| |
| void ThinClientRegion::registerInterestGetValues( |
| const char* method, const std::vector<std::shared_ptr<CacheableKey>>* keys, |
| const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>& |
| resultKeys) { |
| auto exceptions = std::make_shared<HashMapOfException>(); |
| auto err = getAllNoThrow_remote(keys, nullptr, exceptions, resultKeys, true, |
| nullptr); |
| throwExceptionIfError(method, err); |
| // log any exceptions here |
| for (const auto& iter : *exceptions) { |
| LOGWARN("%s Exception for key %s:: %s: %s", method, |
| Utils::nullSafeToString(iter.first).c_str(), |
| iter.second->getName().c_str(), iter.second->what()); |
| } |
| } |
| |
| void ThinClientRegion::destroyDM(bool keepEndpoints) { |
| if (m_tcrdm != nullptr) { |
| m_tcrdm->destroy(keepEndpoints); |
| } |
| } |
| |
| void ThinClientRegion::release(bool invokeCallbacks) { |
| if (m_released) { |
| return; |
| } |
| |
| std::unique_lock<std::mutex> lock(m_notificationMutex, std::defer_lock); |
| if (!m_notifyRelease) { |
| lock.lock(); |
| } |
| |
| destroyDM(invokeCallbacks); |
| |
| m_interestList.clear(); |
| m_interestListRegex.clear(); |
| m_durableInterestList.clear(); |
| m_durableInterestListRegex.clear(); |
| |
| m_interestListForUpdatesAsInvalidates.clear(); |
| m_interestListRegexForUpdatesAsInvalidates.clear(); |
| m_durableInterestListForUpdatesAsInvalidates.clear(); |
| m_durableInterestListRegexForUpdatesAsInvalidates.clear(); |
| |
| LocalRegion::release(invokeCallbacks); |
| } |
| |
| ThinClientRegion::~ThinClientRegion() noexcept { |
| TryWriteGuard guard(m_rwLock, m_destroyPending); |
| if (!m_destroyPending) { |
| release(false); |
| } |
| } |
| |
| void ThinClientRegion::acquireGlobals(bool isFailover) { |
| if (isFailover) { |
| m_tcrdm->acquireFailoverLock(); |
| } |
| } |
| |
| void ThinClientRegion::releaseGlobals(bool isFailover) { |
| if (isFailover) { |
| m_tcrdm->releaseFailoverLock(); |
| } |
| } |
| |
| void ThinClientRegion::executeFunction( |
| const std::string& func, const std::shared_ptr<Cacheable>& args, |
| std::shared_ptr<CacheableVector> routingObj, uint8_t getResult, |
| std::shared_ptr<ResultCollector> rc, int32_t retryAttempts, |
| std::chrono::milliseconds timeout) { |
| int32_t attempt = 0; |
| auto failedNodes = CacheableHashSet::create(); |
| // if pools retry attempts are not set then retry once on all available |
| // endpoints |
| if (retryAttempts == -1) { |
| retryAttempts = static_cast<int32_t>(m_tcrdm->getNumberOfEndPoints()); |
| } |
| |
| bool reExecute = false; |
| bool reExecuteForServ = false; |
| |
| do { |
| TcrMessage* msg; |
| if (reExecuteForServ) { |
| msg = new TcrMessageExecuteRegionFunction( |
| new DataOutput(m_cacheImpl->createDataOutput()), func, this, args, |
| routingObj, getResult, failedNodes, timeout, m_tcrdm.get(), |
| static_cast<int8_t>(1)); |
| } else { |
| msg = new TcrMessageExecuteRegionFunction( |
| new DataOutput(m_cacheImpl->createDataOutput()), func, this, args, |
| routingObj, getResult, failedNodes, timeout, m_tcrdm.get(), |
| static_cast<int8_t>(0)); |
| } |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| // need to check |
| ChunkedFunctionExecutionResponse* resultCollector( |
| new ChunkedFunctionExecutionResponse(reply, (getResult & 2) == 2, rc)); |
| reply.setChunkedResultHandler(resultCollector); |
| reply.setTimeout(timeout); |
| GfErrType err = GF_NOERR; |
| err = m_tcrdm->sendSyncRequest(*msg, reply, !(getResult & 1)); |
| resultCollector->reset(); |
| delete msg; |
| delete resultCollector; |
| if (err == GF_NOERR && |
| (reply.getMessageType() == TcrMessage::EXCEPTION || |
| reply.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) { |
| err = ThinClientRegion::handleServerException("Execute", |
| reply.getException()); |
| } |
| |
| if (ThinClientBaseDM::isFatalClientError(err)) { |
| throwExceptionIfError("ExecuteOnRegion:", err); |
| } else if (err != GF_NOERR) { |
| if (err == GF_FUNCTION_EXCEPTION) { |
| reExecute = true; |
| rc->clearResults(); |
| std::shared_ptr<CacheableHashSet> failedNodesIds(reply.getFailedNode()); |
| failedNodes->clear(); |
| if (failedNodesIds) { |
| LOGDEBUG( |
| "ThinClientRegion::executeFunction with GF_FUNCTION_EXCEPTION " |
| "failedNodesIds size = %zu ", |
| failedNodesIds->size()); |
| failedNodes->insert(failedNodesIds->begin(), failedNodesIds->end()); |
| } |
| } else if (err == GF_NOTCON) { |
| attempt++; |
| LOGDEBUG( |
| "ThinClientRegion::executeFunction with GF_NOTCON retry attempt = " |
| "%d ", |
| attempt); |
| if (attempt > retryAttempts) { |
| throwExceptionIfError("ExecuteOnRegion:", err); |
| } |
| reExecuteForServ = true; |
| rc->clearResults(); |
| failedNodes->clear(); |
| } else if (err == GF_TIMEOUT) { |
| LOGINFO("function timeout. Name: %s, timeout: %z, params: %" PRIu8 |
| ", " |
| "retryAttempts: %d ", |
| func.c_str(), timeout.count(), getResult, retryAttempts); |
| throwExceptionIfError("ExecuteOnRegion", GF_TIMEOUT); |
| } else if (err == GF_CLIENT_WAIT_TIMEOUT || |
| err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) { |
| LOGINFO( |
| "function timeout, possibly bucket is not available or bucket " |
| "blacklisted. Name: %s, timeout: %z, params: %" PRIu8 |
| ", retryAttempts: " |
| "%d ", |
| func.c_str(), timeout.count(), getResult, retryAttempts); |
| throwExceptionIfError("ExecuteOnRegion", GF_CLIENT_WAIT_TIMEOUT); |
| } else { |
| LOGDEBUG("executeFunction err = %d ", err); |
| throwExceptionIfError("ExecuteOnRegion:", err); |
| } |
| } else { |
| reExecute = false; |
| reExecuteForServ = false; |
| } |
| } while (reExecuteForServ); |
| |
| if (reExecute && (getResult & 1)) { |
| reExecuteFunction(func, args, routingObj, getResult, rc, retryAttempts, |
| failedNodes, timeout); |
| } |
| } |
| std::shared_ptr<CacheableVector> ThinClientRegion::reExecuteFunction( |
| const std::string& func, const std::shared_ptr<Cacheable>& args, |
| std::shared_ptr<CacheableVector> routingObj, uint8_t getResult, |
| std::shared_ptr<ResultCollector> rc, int32_t retryAttempts, |
| std::shared_ptr<CacheableHashSet>& failedNodes, |
| std::chrono::milliseconds timeout) { |
| int32_t attempt = 0; |
| bool reExecute = true; |
| // if pools retry attempts are not set then retry once on all available |
| // endpoints |
| if (retryAttempts == -1) { |
| retryAttempts = static_cast<int32_t>(m_tcrdm->getNumberOfEndPoints()); |
| } |
| |
| do { |
| reExecute = false; |
| TcrMessageExecuteRegionFunction msg( |
| new DataOutput(m_cacheImpl->createDataOutput()), func, this, args, |
| routingObj, getResult, failedNodes, timeout, m_tcrdm.get(), |
| static_cast<int8_t>(1)); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| // need to check |
| ChunkedFunctionExecutionResponse* resultCollector( |
| new ChunkedFunctionExecutionResponse(reply, (getResult & 2) == 2, rc)); |
| reply.setChunkedResultHandler(resultCollector); |
| reply.setTimeout(timeout); |
| |
| GfErrType err = GF_NOERR; |
| err = m_tcrdm->sendSyncRequest(msg, reply, !(getResult & 1)); |
| delete resultCollector; |
| if (err == GF_NOERR && |
| (reply.getMessageType() == TcrMessage::EXCEPTION || |
| reply.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) { |
| err = ThinClientRegion::handleServerException("Execute", |
| reply.getException()); |
| } |
| |
| if (ThinClientBaseDM::isFatalClientError(err)) { |
| throwExceptionIfError("ExecuteOnRegion:", err); |
| } else if (err != GF_NOERR) { |
| if (err == GF_FUNCTION_EXCEPTION) { |
| reExecute = true; |
| rc->clearResults(); |
| std::shared_ptr<CacheableHashSet> failedNodesIds(reply.getFailedNode()); |
| failedNodes->clear(); |
| if (failedNodesIds) { |
| LOGDEBUG( |
| "ThinClientRegion::reExecuteFunction with GF_FUNCTION_EXCEPTION " |
| "failedNodesIds size = %zu ", |
| failedNodesIds->size()); |
| failedNodes->insert(failedNodesIds->begin(), failedNodesIds->end()); |
| } |
| } else if ((err == GF_NOTCON) || (err == GF_CLIENT_WAIT_TIMEOUT) || |
| (err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA)) { |
| attempt++; |
| LOGDEBUG( |
| "ThinClientRegion::reExecuteFunction with GF_NOTCON OR TIMEOUT " |
| "retry attempt " |
| "= %d ", |
| attempt); |
| if (attempt > retryAttempts) { |
| throwExceptionIfError("ExecuteOnRegion:", err); |
| } |
| reExecute = true; |
| rc->clearResults(); |
| failedNodes->clear(); |
| } else if (err == GF_TIMEOUT) { |
| LOGINFO("function timeout"); |
| throwExceptionIfError("ExecuteOnRegion", GF_CACHE_TIMEOUT_EXCEPTION); |
| } else { |
| LOGDEBUG("reExecuteFunction err = %d ", err); |
| throwExceptionIfError("ExecuteOnRegion:", err); |
| } |
| } |
| } while (reExecute); |
| return nullptr; |
| } |
| |
| bool ThinClientRegion::executeFunctionSH( |
| const std::string& func, const std::shared_ptr<Cacheable>& args, |
| uint8_t getResult, std::shared_ptr<ResultCollector> rc, |
| const std::shared_ptr<ClientMetadataService::ServerToKeysMap>& locationMap, |
| std::shared_ptr<CacheableHashSet>& failedNodes, |
| std::chrono::milliseconds timeout, bool allBuckets) { |
| bool reExecute = false; |
| auto resultCollectorLock = std::make_shared<std::recursive_mutex>(); |
| const auto& userAttr = UserAttributes::threadLocalUserAttributes; |
| std::vector<std::shared_ptr<OnRegionFunctionExecution>> feWorkers; |
| auto& threadPool = |
| CacheRegionHelper::getCacheImpl(&getCache())->getThreadPool(); |
| |
| for (const auto& locationIter : *locationMap) { |
| const auto& serverLocation = locationIter.first; |
| const auto& routingObj = locationIter.second; |
| auto worker = std::make_shared<OnRegionFunctionExecution>( |
| func, this, args, routingObj, getResult, timeout, |
| dynamic_cast<ThinClientPoolDM*>(m_tcrdm.get()), resultCollectorLock, rc, |
| userAttr, false, serverLocation, allBuckets); |
| threadPool.perform(worker); |
| feWorkers.push_back(worker); |
| } |
| |
| GfErrType abortError = GF_NOERR; |
| |
| for (auto worker : feWorkers) { |
| auto err = worker->getResult(); |
| auto currentReply = worker->getReply(); |
| |
| if (err == GF_NOERR && |
| (currentReply->getMessageType() == TcrMessage::EXCEPTION || |
| currentReply->getMessageType() == |
| TcrMessage::EXECUTE_REGION_FUNCTION_ERROR)) { |
| err = ThinClientRegion::handleServerException( |
| "Execute", currentReply->getException()); |
| } |
| |
| if (err != GF_NOERR) { |
| if (err == GF_FUNCTION_EXCEPTION) { |
| reExecute = true; |
| if (auto poolDM = |
| std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) { |
| if (poolDM->getClientMetaDataService()) { |
| poolDM->getClientMetaDataService()->enqueueForMetadataRefresh( |
| this->getFullPath(), 0); |
| } |
| } |
| worker->getResultCollector()->reset(); |
| { |
| std::lock_guard<decltype(*resultCollectorLock)> guard( |
| *resultCollectorLock); |
| rc->clearResults(); |
| } |
| std::shared_ptr<CacheableHashSet> failedNodeIds( |
| currentReply->getFailedNode()); |
| if (failedNodeIds) { |
| LOGDEBUG( |
| "ThinClientRegion::executeFunctionSH with GF_FUNCTION_EXCEPTION " |
| "failedNodeIds size = %zu ", |
| failedNodeIds->size()); |
| failedNodes->insert(failedNodeIds->begin(), failedNodeIds->end()); |
| } |
| } else if ((err == GF_NOTCON) || (err == GF_CLIENT_WAIT_TIMEOUT) || |
| (err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA)) { |
| reExecute = true; |
| LOGINFO( |
| "ThinClientRegion::executeFunctionSH with GF_NOTCON or " |
| "GF_CLIENT_WAIT_TIMEOUT "); |
| if (auto poolDM = |
| std::dynamic_pointer_cast<ThinClientPoolDM>(m_tcrdm)) { |
| if (poolDM->getClientMetaDataService()) { |
| poolDM->getClientMetaDataService()->enqueueForMetadataRefresh( |
| this->getFullPath(), 0); |
| } |
| } |
| worker->getResultCollector()->reset(); |
| { |
| std::lock_guard<decltype(*resultCollectorLock)> guard( |
| *resultCollectorLock); |
| rc->clearResults(); |
| } |
| } else { |
| if (ThinClientBaseDM::isFatalClientError(err)) { |
| LOGERROR("ThinClientRegion::executeFunctionSH: Fatal Exception"); |
| } else { |
| LOGWARN("ThinClientRegion::executeFunctionSH: Unexpected Exception"); |
| } |
| |
| if (abortError == GF_NOERR) { |
| abortError = err; |
| } |
| } |
| } |
| } |
| |
| if (abortError != GF_NOERR) { |
| throwExceptionIfError("ExecuteOnRegion:", abortError); |
| } |
| return reExecute; |
| } |
| |
| GfErrType ThinClientRegion::getFuncAttributes(const std::string& func, |
| std::vector<int8_t>** attr) { |
| GfErrType err = GF_NOERR; |
| |
| // do TCR GET_FUNCTION_ATTRIBUTES |
| LOGDEBUG("Tcrmessage request GET_FUNCTION_ATTRIBUTES "); |
| TcrMessageGetFunctionAttributes request( |
| new DataOutput(m_cacheImpl->createDataOutput()), func, m_tcrdm.get()); |
| TcrMessageReply reply(true, m_tcrdm.get()); |
| err = m_tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| *attr = reply.getFunctionAttributes(); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = handleServerException("Region::GET_FUNCTION_ATTRIBUTES", |
| reply.getException()); |
| break; |
| } |
| case TcrMessage::REQUEST_DATA_ERROR: { |
| LOGERROR("Error message from server: " + reply.getValue()->toString()); |
| throw FunctionExecutionException(reply.getValue()->toString()); |
| } |
| default: { |
| LOGERROR("Unknown message type %d while getting function attributes.", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientRegion::getNoThrow_FullObject( |
| std::shared_ptr<EventId> eventId, std::shared_ptr<Cacheable>& fullObject, |
| std::shared_ptr<VersionTag>& versionTag) { |
| TcrMessageRequestEventValue fullObjectMsg( |
| new DataOutput(m_cacheImpl->createDataOutput()), eventId); |
| TcrMessageReply reply(true, nullptr); |
| |
| GfErrType err = GF_NOTCON; |
| err = m_tcrdm->sendSyncRequest(fullObjectMsg, reply, false, true); |
| if (err == GF_NOERR) { |
| fullObject = reply.getValue(); |
| } |
| versionTag = reply.getVersionTag(); |
| return err; |
| } |
| |
| void ThinClientRegion::txDestroy( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag> versionTag) { |
| GfErrType err = destroyNoThrowTX(key, aCallbackArgument, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| throwExceptionIfError("Region::destroyTX", err); |
| } |
| |
| void ThinClientRegion::txInvalidate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag> versionTag) { |
| GfErrType err = invalidateNoThrowTX(key, aCallbackArgument, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| throwExceptionIfError("Region::invalidateTX", err); |
| } |
| |
| void ThinClientRegion::txPut( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| int64_t sampleStartNanos = startStatOpTime(); |
| GfErrType err = putNoThrowTX(key, value, aCallbackArgument, oldValue, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| |
| updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(), |
| sampleStartNanos); |
| throwExceptionIfError("Region::putTX", err); |
| } |
| |
| void ThinClientRegion::setProcessedMarker(bool) {} |
| |
| void ChunkedInterestResponse::reset() { |
| if (m_resultKeys != nullptr && m_resultKeys->size() > 0) { |
| m_resultKeys->clear(); |
| } |
| } |
| |
| void ChunkedInterestResponse::handleChunk(const uint8_t* chunk, |
| int32_t chunkLen, |
| uint8_t isLastChunkWithSecurity, |
| const CacheImpl* cacheImpl) { |
| auto input = |
| cacheImpl->createDataInput(chunk, chunkLen, m_replyMsg.getPool()); |
| |
| uint32_t partLen; |
| if (TcrMessageHelper::readChunkPartHeader( |
| m_msg, input, DSCode::FixedIDDefault, |
| static_cast<int32_t>(DSCode::CacheableArrayList), |
| "ChunkedInterestResponse", partLen, isLastChunkWithSecurity) != |
| TcrMessageHelper::ChunkObjectType::OBJECT) { |
| // encountered an exception part, so return without reading more |
| m_replyMsg.readSecureObjectPart(input, false, true, |
| isLastChunkWithSecurity); |
| return; |
| } |
| |
| if (m_resultKeys == nullptr) { |
| m_resultKeys = |
| std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>(); |
| } |
| serializer::readObject(input, *m_resultKeys); |
| m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| } |
| |
| void ChunkedKeySetResponse::reset() { |
| if (m_resultKeys.size() > 0) { |
| m_resultKeys.clear(); |
| } |
| } |
| |
| void ChunkedKeySetResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, |
| uint8_t isLastChunkWithSecurity, |
| const CacheImpl* cacheImpl) { |
| auto input = |
| cacheImpl->createDataInput(chunk, chunkLen, m_replyMsg.getPool()); |
| |
| uint32_t partLen; |
| if (TcrMessageHelper::readChunkPartHeader( |
| m_msg, input, DSCode::FixedIDDefault, |
| static_cast<int32_t>(DSCode::CacheableArrayList), |
| "ChunkedKeySetResponse", partLen, isLastChunkWithSecurity) != |
| TcrMessageHelper::ChunkObjectType::OBJECT) { |
| // encountered an exception part, so return without reading more |
| m_replyMsg.readSecureObjectPart(input, false, true, |
| isLastChunkWithSecurity); |
| return; |
| } |
| |
| serializer::readObject(input, m_resultKeys); |
| m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| } |
| |
| void ChunkedQueryResponse::reset() { |
| m_queryResults->clear(); |
| m_structFieldNames.clear(); |
| } |
| |
| void ChunkedQueryResponse::readObjectPartList(DataInput& input, |
| bool isResultSet) { |
| if (input.readBoolean()) { |
| LOGERROR("Query response has keys which is unexpected."); |
| throw IllegalStateException("Query response has keys which is unexpected."); |
| } |
| |
| int32_t len = input.readInt32(); |
| |
| for (int32_t index = 0; index < len; ++index) { |
| if (input.read() == 2 /* for exception*/) { |
| input.advanceCursor(input.readArrayLength()); // skipLen |
| auto exMsgPtr = input.readString(); |
| throw IllegalStateException(exMsgPtr); |
| } else { |
| if (isResultSet) { |
| std::shared_ptr<Cacheable> value; |
| input.readObject(value); |
| m_queryResults->push_back(value); |
| } else { |
| auto code = static_cast<DSCode>(input.read()); |
| if (code == DSCode::FixedIDByte) { |
| auto arrayType = static_cast<DSFid>(input.read()); |
| if (arrayType != DSFid::CacheableObjectPartList) { |
| LOGERROR( |
| "Query response got unhandled message format %d while " |
| "expecting struct set object part list; possible serialization " |
| "mismatch", |
| arrayType); |
| throw MessageException( |
| "Query response got unhandled message format while expecting " |
| "struct set object part list; possible serialization mismatch"); |
| } |
| readObjectPartList(input, true); |
| } else { |
| LOGERROR( |
| "Query response got unhandled message format %" PRId8 |
| "while expecting " |
| "struct set object part list; possible serialization mismatch", |
| code); |
| throw MessageException( |
| "Query response got unhandled message format while expecting " |
| "struct set object part list; possible serialization mismatch"); |
| } |
| } |
| } |
| } |
| } |
| |
| void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, |
| uint8_t isLastChunkWithSecurity, |
| const CacheImpl* cacheImpl) { |
| LOGDEBUG("ChunkedQueryResponse::handleChunk.."); |
| auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool()); |
| |
| uint32_t partLen; |
| auto objType = TcrMessageHelper::readChunkPartHeader( |
| m_msg, input, DSCode::FixedIDByte, |
| static_cast<int32_t>(DSFid::CollectionTypeImpl), "ChunkedQueryResponse", |
| partLen, isLastChunkWithSecurity); |
| if (objType == TcrMessageHelper::ChunkObjectType::EXCEPTION) { |
| // encountered an exception part, so return without reading more |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| return; |
| } else if (objType == TcrMessageHelper::ChunkObjectType::NULL_OBJECT) { |
| // special case for scalar result |
| input.readInt32(); // ignored part length |
| input.read(); // ignored is object |
| auto intVal = std::dynamic_pointer_cast<CacheableInt32>(input.readObject()); |
| m_queryResults->push_back(intVal); |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| return; |
| } |
| |
| // ignoring parent classes for now |
| // we will require to look at it once CQ is to be implemented. |
| // skipping HashSet/StructSet |
| // qhe: It was agreed upon that we'll use set for all kinds of results. |
| // to avoid dealing with compare operator for user objets. |
| // If the results on server are in a bag, or the user need to manipulate |
| // the elements, then we have to revisit this issue. |
| // For now, we'll live with duplicate records, hoping they do not cost much. |
| skipClass(input); |
| // skipping CollectionTypeImpl |
| // skipClass(input); // no longer, since GFE 5.7 |
| |
| input.read(); // this is Fixed ID byte (1) |
| input.read(); // this is DataSerializable (45) |
| input.read(); // this is Class 43 |
| const auto isStructTypeImpl = input.readString(); |
| |
| if (isStructTypeImpl == "org.apache.geode.cache.query.Struct") { |
| int32_t numOfFldNames = input.readArrayLength(); |
| bool skip = false; |
| if (m_structFieldNames.size() != 0) { |
| skip = true; |
| } |
| for (int i = 0; i < numOfFldNames; i++) { |
| auto sptr = input.readString(); |
| if (!skip) { |
| m_structFieldNames.push_back(sptr); |
| } |
| } |
| } |
| |
| // skip the remaining part |
| input.reset(); |
| // skip the whole part including partLen and isObj (4+1) |
| input.advanceCursor(partLen + 5); |
| |
| input.readInt32(); // skip part length |
| |
| if (!input.read()) { |
| LOGERROR( |
| "Query response part is not an object; possible serialization " |
| "mismatch"); |
| throw MessageException( |
| "Query response part is not an object; possible serialization " |
| "mismatch"); |
| } |
| |
| bool isResultSet = (m_structFieldNames.size() == 0); |
| |
| auto arrayType = static_cast<DSCode>(input.read()); |
| |
| if (arrayType == DSCode::CacheableObjectArray) { |
| int32_t arraySize = input.readArrayLength(); |
| skipClass(input); |
| for (int32_t arrayItem = 0; arrayItem < arraySize; ++arrayItem) { |
| std::shared_ptr<Serializable> value; |
| if (isResultSet) { |
| input.readObject(value); |
| m_queryResults->push_back(value); |
| } else { |
| input.read(); |
| int32_t arraySize2 = input.readArrayLength(); |
| skipClass(input); |
| for (int32_t index = 0; index < arraySize2; ++index) { |
| input.readObject(value); |
| m_queryResults->push_back(value); |
| } |
| } |
| } |
| } else if (arrayType == DSCode::FixedIDByte) { |
| arrayType = static_cast<DSCode>(input.read()); |
| if (static_cast<int32_t>(arrayType) != |
| static_cast<int32_t>(DSFid::CacheableObjectPartList)) { |
| LOGERROR( |
| "Query response got unhandled message format %d while expecting " |
| "object part list; possible serialization mismatch", |
| arrayType); |
| throw MessageException( |
| "Query response got unhandled message format while expecting object " |
| "part list; possible serialization mismatch"); |
| } |
| readObjectPartList(input, isResultSet); |
| } else { |
| LOGERROR( |
| "Query response got unhandled message format %d; possible " |
| "serialization mismatch", |
| arrayType); |
| throw MessageException( |
| "Query response got unhandled message format; possible serialization " |
| "mismatch"); |
| } |
| |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| } |
| |
| void ChunkedQueryResponse::skipClass(DataInput& input) { |
| auto classByte = static_cast<DSCode>(input.read()); |
| if (classByte == DSCode::Class) { |
| // ignore string type id - assuming its a normal (under 64k) string. |
| input.read(); |
| uint16_t classLen = input.readInt16(); |
| input.advanceCursor(classLen); |
| } else { |
| throw IllegalStateException( |
| "ChunkedQueryResponse::skipClass: " |
| "Did not get expected class header byte"); |
| } |
| } |
| |
| void ChunkedFunctionExecutionResponse::reset() { |
| // m_functionExecutionResults->clear(); |
| } |
| |
| void ChunkedFunctionExecutionResponse::handleChunk( |
| const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity, |
| const CacheImpl* cacheImpl) { |
| LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk"); |
| auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool()); |
| |
| uint32_t partLen; |
| |
| TcrMessageHelper::ChunkObjectType arrayType; |
| if ((arrayType = TcrMessageHelper::readChunkPartHeader( |
| m_msg, input, "ChunkedFunctionExecutionResponse", partLen, |
| isLastChunkWithSecurity)) == |
| TcrMessageHelper::ChunkObjectType::EXCEPTION) { |
| // encountered an exception part, so return without reading more |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| return; |
| } |
| |
| if (m_getResult == false) { |
| return; |
| } |
| |
| if (static_cast<TcrMessageHelper::ChunkObjectType>(arrayType) == |
| TcrMessageHelper::ChunkObjectType::NULL_OBJECT) { |
| LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk nullptr object"); |
| // m_functionExecutionResults->push_back(nullptr); |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| return; |
| } |
| |
| auto startLen = |
| input.getBytesRead() - |
| 1; // from here need to look value part + memberid AND -1 for array type |
| // iread adn gnore array length |
| input.readArrayLength(); |
| |
| // read a byte to determine whether to read exception part for sendException |
| // or read objects. |
| auto partType = static_cast<DSCode>(input.read()); |
| bool isExceptionPart = false; |
| // See If partType is JavaSerializable |
| const int CHUNK_HDR_LEN = 5; |
| const int SECURE_PART_LEN = 5 + 8; |
| bool readPart = true; |
| LOGDEBUG( |
| "ChunkedFunctionExecutionResponse::handleChunk chunkLen = %d & partLen = " |
| "%d ", |
| chunkLen, partLen); |
| if (partType == DSCode::JavaSerializable) { |
| isExceptionPart = true; |
| // reset the input. |
| input.reset(); |
| |
| if (((isLastChunkWithSecurity & 0x02) && |
| (chunkLen - static_cast<int32_t>(partLen) <= |
| CHUNK_HDR_LEN + SECURE_PART_LEN)) || |
| (((isLastChunkWithSecurity & 0x02) == 0) && |
| (chunkLen - static_cast<int32_t>(partLen) <= CHUNK_HDR_LEN))) { |
| readPart = false; |
| partLen = input.readInt32(); |
| input.advanceCursor(1); // skip isObject byte |
| input.advanceCursor(partLen); |
| } else { |
| // skip first part i.e JavaSerializable. |
| TcrMessageHelper::skipParts(m_msg, input, 1); |
| |
| // read the second part which is string in usual manner, first its length. |
| partLen = input.readInt32(); |
| |
| // then isObject byte |
| input.read(); // ignore iSobject |
| |
| startLen = input.getBytesRead(); // reset from here need to look value |
| // part + memberid AND -1 for array type |
| |
| // Since it is contained as a part of other results, read arrayType which |
| // is arrayList = 65. |
| input.read(); |
| |
| // read and ignore its len which is 2 |
| input.readArrayLength(); |
| } |
| } else { |
| // rewind cursor by 1 to what we had read a byte to determine whether to |
| // read exception part or read objects. |
| input.rewindCursor(1); |
| } |
| |
| // Read either object or exception string from sendException. |
| std::shared_ptr<Serializable> value; |
| // std::shared_ptr<Cacheable> memberId; |
| if (readPart) { |
| input.readObject(value); |
| // TODO: track this memberId for PrFxHa |
| // input.readObject(memberId); |
| auto objectlen = input.getBytesRead() - startLen; |
| |
| auto memberIdLen = partLen - objectlen; |
| input.advanceCursor(memberIdLen); |
| LOGDEBUG("function partlen = %d , objectlen = %z, memberidlen = %z ", |
| partLen, objectlen, memberIdLen); |
| LOGDEBUG("function input.getBytesRemaining() = %z ", |
| input.getBytesRemaining()); |
| |
| } else { |
| value = CacheableString::create("Function exception result."); |
| } |
| if (m_rc != nullptr) { |
| std::shared_ptr<Cacheable> result = nullptr; |
| if (isExceptionPart) { |
| result = std::make_shared<UserFunctionExecutionException>( |
| std::dynamic_pointer_cast<CacheableString>(value)->value()); |
| } else { |
| result = value; |
| } |
| if (m_resultCollectorLock) { |
| std::lock_guard<decltype(*m_resultCollectorLock)> guard( |
| *m_resultCollectorLock); |
| m_rc->addResult(result); |
| } else { |
| m_rc->addResult(result); |
| } |
| } |
| |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| // m_functionExecutionResults->push_back(value); |
| } |
| |
| void ChunkedGetAllResponse::reset() { |
| m_keysOffset = 0; |
| if (m_resultKeys != nullptr && m_resultKeys->size() > 0) { |
| m_resultKeys->clear(); |
| } |
| } |
| |
| // process a GET_ALL response chunk |
| void ChunkedGetAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, |
| uint8_t isLastChunkWithSecurity, |
| const CacheImpl* cacheImpl) { |
| auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool()); |
| |
| uint32_t partLen; |
| if (TcrMessageHelper::readChunkPartHeader( |
| m_msg, input, DSCode::FixedIDByte, |
| static_cast<int32_t>(DSFid::VersionedObjectPartList), |
| "ChunkedGetAllResponse", partLen, isLastChunkWithSecurity) != |
| TcrMessageHelper::ChunkObjectType::OBJECT) { |
| // encountered an exception part, so return without reading more |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| return; |
| } |
| |
| VersionedCacheableObjectPartList objectList( |
| m_keys, &m_keysOffset, m_values, m_exceptions, m_resultKeys, m_region, |
| &m_trackerMap, m_destroyTracker, m_addToLocalCache, m_dsmemId, |
| m_responseLock); |
| |
| objectList.fromData(input); |
| |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| } |
| |
| void ChunkedGetAllResponse::add(const ChunkedGetAllResponse* other) { |
| if (m_values) { |
| for (const auto& iter : *m_values) { |
| m_values->emplace(iter.first, iter.second); |
| } |
| } |
| |
| if (m_exceptions) { |
| m_exceptions->insert(other->m_exceptions->begin(), |
| other->m_exceptions->end()); |
| } |
| |
| for (const auto& iter : other->m_trackerMap) { |
| m_trackerMap[iter.first] = iter.second; |
| } |
| |
| if (m_resultKeys) { |
| m_resultKeys->insert(m_resultKeys->end(), other->m_resultKeys->begin(), |
| other->m_resultKeys->end()); |
| } |
| } |
| |
| void ChunkedPutAllResponse::reset() { |
| if (m_list != nullptr && m_list->size() > 0) { |
| m_list->getVersionedTagptr().clear(); |
| } |
| } |
| |
| // process a PUT_ALL response chunk |
| void ChunkedPutAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, |
| uint8_t isLastChunkWithSecurity, |
| const CacheImpl* cacheImpl) { |
| auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool()); |
| |
| uint32_t partLen; |
| TcrMessageHelper::ChunkObjectType chunkType; |
| if ((chunkType = TcrMessageHelper::readChunkPartHeader( |
| m_msg, input, DSCode::FixedIDByte, |
| static_cast<int32_t>(DSFid::VersionedObjectPartList), |
| "ChunkedPutAllResponse", partLen, isLastChunkWithSecurity)) == |
| TcrMessageHelper::ChunkObjectType::NULL_OBJECT) { |
| LOGDEBUG("ChunkedPutAllResponse::handleChunk nullptr object"); |
| // No issues it will be empty in case of disabled caching. |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| return; |
| } |
| |
| if (chunkType == TcrMessageHelper::ChunkObjectType::OBJECT) { |
| LOGDEBUG("ChunkedPutAllResponse::handleChunk object"); |
| std::recursive_mutex responseLock; |
| auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>( |
| dynamic_cast<ThinClientRegion*>(m_region.get()), |
| m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock); |
| vcObjPart->fromData(input); |
| m_list->addAll(vcObjPart); |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| } else { |
| LOGDEBUG("ChunkedPutAllResponse::handleChunk BYTES PART"); |
| const auto byte0 = input.read(); |
| LOGDEBUG("ChunkedPutAllResponse::handleChunk single-hop bytes byte0 = %d ", |
| byte0); |
| const auto byte1 = input.read(); |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| |
| auto pool = m_msg.getPool(); |
| if (pool != nullptr && !pool->isDestroyed() && |
| pool->getPRSingleHopEnabled()) { |
| auto poolDM = dynamic_cast<ThinClientPoolDM*>(pool); |
| if ((poolDM != nullptr) && |
| (poolDM->getClientMetaDataService() != nullptr) && (byte0 != 0)) { |
| LOGFINE("enqueued region " + m_region->getFullPath() + |
| " for metadata refresh for singlehop for PUTALL operation."); |
| poolDM->getClientMetaDataService()->enqueueForMetadataRefresh( |
| m_region->getFullPath(), byte1); |
| } |
| } |
| } |
| } |
| |
| void ChunkedRemoveAllResponse::reset() { |
| if (m_list != nullptr && m_list->size() > 0) { |
| m_list->getVersionedTagptr().clear(); |
| } |
| } |
| |
| // process a REMOVE_ALL response chunk |
| void ChunkedRemoveAllResponse::handleChunk(const uint8_t* chunk, |
| int32_t chunkLen, |
| uint8_t isLastChunkWithSecurity, |
| const CacheImpl* cacheImpl) { |
| auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool()); |
| |
| uint32_t partLen; |
| TcrMessageHelper::ChunkObjectType chunkType; |
| if ((chunkType = TcrMessageHelper::readChunkPartHeader( |
| m_msg, input, DSCode::FixedIDByte, |
| static_cast<int32_t>(DSFid::VersionedObjectPartList), |
| "ChunkedRemoveAllResponse", partLen, isLastChunkWithSecurity)) == |
| TcrMessageHelper::ChunkObjectType::NULL_OBJECT) { |
| LOGDEBUG("ChunkedRemoveAllResponse::handleChunk nullptr object"); |
| // No issues it will be empty in case of disabled caching. |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| return; |
| } |
| |
| if (chunkType == TcrMessageHelper::ChunkObjectType::OBJECT) { |
| LOGDEBUG("ChunkedRemoveAllResponse::handleChunk object"); |
| std::recursive_mutex responseLock; |
| auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>( |
| dynamic_cast<ThinClientRegion*>(m_region.get()), |
| m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock); |
| vcObjPart->fromData(input); |
| m_list->addAll(vcObjPart); |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| } else { |
| LOGDEBUG("ChunkedRemoveAllResponse::handleChunk BYTES PART"); |
| const auto byte0 = input.read(); |
| LOGDEBUG( |
| "ChunkedRemoveAllResponse::handleChunk single-hop bytes byte0 = %d ", |
| byte0); |
| const auto byte1 = input.read(); |
| m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); |
| |
| auto pool = m_msg.getPool(); |
| if (pool != nullptr && !pool->isDestroyed() && |
| pool->getPRSingleHopEnabled()) { |
| auto poolDM = dynamic_cast<ThinClientPoolDM*>(pool); |
| if ((poolDM != nullptr) && |
| (poolDM->getClientMetaDataService() != nullptr) && (byte0 != 0)) { |
| LOGFINE("enqueued region " + m_region->getFullPath() + |
| " for metadata refresh for singlehop for REMOVEALL operation."); |
| poolDM->getClientMetaDataService()->enqueueForMetadataRefresh( |
| m_region->getFullPath(), byte1); |
| } |
| } |
| } |
| } |
| |
| void ChunkedDurableCQListResponse::reset() { |
| if (m_resultList) { |
| m_resultList->clear(); |
| } |
| } |
| |
| // handles the chunk response for GETDURABLECQS_MSG_TYPE |
| void ChunkedDurableCQListResponse::handleChunk(const uint8_t* chunk, |
| int32_t chunkLen, uint8_t, |
| const CacheImpl* cacheImpl) { |
| auto input = cacheImpl->createDataInput(chunk, chunkLen, m_msg.getPool()); |
| |
| // read and ignore part length |
| input.readInt32(); |
| if (!input.readBoolean()) { |
| // we're currently always expecting an object |
| char exMsg[256]; |
| std::snprintf( |
| exMsg, 255, |
| "ChunkedDurableCQListResponse::handleChunk: part is not object"); |
| throw MessageException(exMsg); |
| } |
| |
| input.advanceCursor(1); // skip the CacheableArrayList type ID byte |
| |
| const auto stringParts = input.read(); // read the number of strings in the |
| // message this is one byte |
| |
| for (int i = 0; i < stringParts; i++) { |
| m_resultList->push_back( |
| std::dynamic_pointer_cast<CacheableString>(input.readObject())); |
| } |
| } |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |