|  | /* | 
|  | * Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | * contributor license agreements.  See the NOTICE file distributed with | 
|  | * this work for additional information regarding copyright ownership. | 
|  | * The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | * (the "License"); you may not use this file except in compliance with | 
|  | * the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | #pragma once | 
|  |  | 
|  | #ifndef GEODE_CLIENTMETADATASERVICE_H_ | 
|  | #define GEODE_CLIENTMETADATASERVICE_H_ | 
|  |  | 
|  | #include <atomic> | 
|  | #include <chrono> | 
|  | #include <condition_variable> | 
|  | #include <deque> | 
|  | #include <memory> | 
|  | #include <mutex> | 
|  | #include <string> | 
|  | #include <thread> | 
|  | #include <unordered_map> | 
|  |  | 
|  | #include <boost/thread/shared_mutex.hpp> | 
|  |  | 
|  | #include <geode/CacheableKey.hpp> | 
|  | #include <geode/Region.hpp> | 
|  | #include <geode/Serializable.hpp> | 
|  | #include <geode/internal/functional.hpp> | 
|  |  | 
|  | #include "AppDomainContext.hpp" | 
|  | #include "BucketServerLocation.hpp" | 
|  | #include "ServerLocation.hpp" | 
|  |  | 
|  | namespace apache { | 
|  | namespace geode { | 
|  | namespace client { | 
|  |  | 
|  | class ClientMetadata; | 
|  | class ThinClientPoolDM; | 
|  |  | 
|  | typedef std::map<std::string, std::shared_ptr<ClientMetadata>> | 
|  | RegionMetadataMapType; | 
|  |  | 
|  | class BucketStatus { | 
|  | private: | 
|  | using clock = std::chrono::steady_clock; | 
|  | const static clock::time_point m_noTimeout; | 
|  | clock::time_point m_lastTimeout; | 
|  |  | 
|  | public: | 
|  | BucketStatus() = default; | 
|  | bool isTimedoutAndReset(std::chrono::milliseconds millis) { | 
|  | if (m_lastTimeout == m_noTimeout) { | 
|  | return false; | 
|  | } else { | 
|  | auto timeout = m_lastTimeout + millis; | 
|  | if (timeout > clock::now()) { | 
|  | return true;  // timeout as buckste not recovered yet | 
|  | } else { | 
|  | // reset to zero as we waited enough to recover bucket | 
|  | m_lastTimeout = m_noTimeout; | 
|  | return false; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void setTimeout() { | 
|  | if (m_lastTimeout == m_noTimeout) { | 
|  | m_lastTimeout = clock::now();  // set once only for timeout | 
|  | } | 
|  | } | 
|  | }; | 
|  |  | 
|  | class PRbuckets { | 
|  | private: | 
|  | BucketStatus* m_buckets; | 
|  |  | 
|  | public: | 
|  | explicit PRbuckets(int32_t nBuckets) { | 
|  | m_buckets = new BucketStatus[nBuckets]; | 
|  | } | 
|  | ~PRbuckets() { delete[] m_buckets; } | 
|  |  | 
|  | bool isBucketTimedOut(int32_t bucketId, std::chrono::milliseconds millis) { | 
|  | return m_buckets[bucketId].isTimedoutAndReset(millis); | 
|  | } | 
|  |  | 
|  | void setBucketTimeout(int32_t bucketId) { m_buckets[bucketId].setTimeout(); } | 
|  | }; | 
|  |  | 
|  | class ClientMetadataService { | 
|  | public: | 
|  | ClientMetadataService(const ClientMetadataService&) = delete; | 
|  | ClientMetadataService& operator=(const ClientMetadataService&) = delete; | 
|  | ClientMetadataService() = delete; | 
|  | explicit ClientMetadataService(ThinClientPoolDM* pool); | 
|  | inline ~ClientMetadataService() noexcept = default; | 
|  |  | 
|  | void start(); | 
|  |  | 
|  | void stop(); | 
|  |  | 
|  | void svc(void); | 
|  |  | 
|  | void getClientPRMetadata(const char* regionFullPath); | 
|  |  | 
|  | void getBucketServerLocation( | 
|  | const std::shared_ptr<Region>& region, | 
|  | const std::shared_ptr<CacheableKey>& key, | 
|  | const std::shared_ptr<Cacheable>& value, | 
|  | const std::shared_ptr<Serializable>& aCallbackArgument, bool isPrimary, | 
|  | std::shared_ptr<BucketServerLocation>& serverLocation, int8_t& version); | 
|  |  | 
|  | std::shared_ptr<ClientMetadata> getClientMetadata( | 
|  | const std::string& regionFullPath); | 
|  |  | 
|  | void enqueueForMetadataRefresh(const std::string& regionFullPath, | 
|  | int8_t serverGroupFlag); | 
|  |  | 
|  | typedef std::unordered_map< | 
|  | std::shared_ptr<BucketServerLocation>, | 
|  | std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>, | 
|  | dereference_hash<std::shared_ptr<BucketServerLocation>>, | 
|  | dereference_equal_to<std::shared_ptr<BucketServerLocation>>> | 
|  | ServerToFilterMap; | 
|  | std::shared_ptr<ServerToFilterMap> getServerToFilterMap( | 
|  | const std::vector<std::shared_ptr<CacheableKey>>& keys, | 
|  | const std::shared_ptr<Region>& region, bool isPrimary); | 
|  |  | 
|  | void markPrimaryBucketForTimeoutButLookSecondaryBucket( | 
|  | const std::shared_ptr<Region>& region, | 
|  | const std::shared_ptr<CacheableKey>& key, | 
|  | const std::shared_ptr<Cacheable>& value, | 
|  | const std::shared_ptr<Serializable>& aCallbackArgument, bool isPrimary, | 
|  | std::shared_ptr<BucketServerLocation>& serverLocation, int8_t& version); | 
|  |  | 
|  | bool isBucketMarkedForTimeout(const char* regionFullPath, int32_t bucketid); | 
|  |  | 
|  | typedef std::unordered_set<int32_t> BucketSet; | 
|  | typedef std::unordered_map< | 
|  | std::shared_ptr<BucketServerLocation>, std::shared_ptr<BucketSet>, | 
|  | dereference_hash<std::shared_ptr<BucketServerLocation>>, | 
|  | dereference_equal_to<std::shared_ptr<BucketServerLocation>>> | 
|  | ServerToBucketsMap; | 
|  |  | 
|  | static std::shared_ptr<BucketServerLocation> findNextServer( | 
|  | const ServerToBucketsMap& serverToBucketsMap, | 
|  | const BucketSet& currentBucketSet); | 
|  |  | 
|  | typedef std::unordered_map<int32_t, std::shared_ptr<CacheableHashSet>> | 
|  | BucketToKeysMap; | 
|  | std::shared_ptr<BucketToKeysMap> groupByBucketOnClientSide( | 
|  | const std::shared_ptr<Region>& region, | 
|  | const std::shared_ptr<CacheableVector>& keySet, | 
|  | const std::shared_ptr<ClientMetadata>& metadata); | 
|  |  | 
|  | typedef std::unordered_map< | 
|  | std::shared_ptr<BucketServerLocation>, std::shared_ptr<CacheableHashSet>, | 
|  | dereference_hash<std::shared_ptr<BucketServerLocation>>, | 
|  | dereference_equal_to<std::shared_ptr<BucketServerLocation>>> | 
|  | ServerToKeysMap; | 
|  | std::shared_ptr<ServerToKeysMap> getServerToFilterMapFESHOP( | 
|  | const std::shared_ptr<CacheableVector>& keySet, | 
|  | const std::shared_ptr<Region>& region, bool isPrimary); | 
|  |  | 
|  | std::shared_ptr<ClientMetadataService::ServerToBucketsMap> | 
|  | groupByServerToAllBuckets(const std::shared_ptr<Region>& region, | 
|  | bool optimizeForWrite); | 
|  |  | 
|  | std::shared_ptr<ClientMetadataService::ServerToBucketsMap> | 
|  | groupByServerToBuckets(const std::shared_ptr<ClientMetadata>& metadata, | 
|  | const BucketSet& bucketSet, bool optimizeForWrite); | 
|  |  | 
|  | static std::shared_ptr<ClientMetadataService::ServerToBucketsMap> pruneNodes( | 
|  | const std::shared_ptr<ClientMetadata>& metadata, | 
|  | const BucketSet& buckets); | 
|  |  | 
|  | void removeBucketServerLocation( | 
|  | const std::shared_ptr<BucketServerLocation>& serverLocation); | 
|  |  | 
|  | private: | 
|  | std::shared_ptr<ClientMetadata> SendClientPRMetadata( | 
|  | const char* regionPath, std::shared_ptr<ClientMetadata> cptr); | 
|  |  | 
|  | std::shared_ptr<ClientMetadata> getClientMetadata( | 
|  | const std::shared_ptr<Region>& region); | 
|  |  | 
|  | private: | 
|  | std::thread m_thread; | 
|  | boost::shared_mutex m_regionMetadataLock; | 
|  | RegionMetadataMapType m_regionMetaDataMap; | 
|  | std::atomic<bool> m_run; | 
|  | ThinClientPoolDM* m_pool; | 
|  | CacheImpl* m_cache; | 
|  | std::deque<std::string> m_regionQueue; | 
|  | std::mutex m_regionQueueMutex; | 
|  | std::condition_variable m_regionQueueCondition; | 
|  | boost::shared_mutex m_PRbucketStatusLock; | 
|  | std::map<std::string, std::unique_ptr<PRbuckets>> m_bucketStatus; | 
|  | std::chrono::milliseconds m_bucketWaitTimeout; | 
|  | static const char* NC_CMDSvcThread; | 
|  | std::unique_ptr<AppDomainContext> m_appDomainContext; | 
|  | }; | 
|  | }  // namespace client | 
|  | }  // namespace geode | 
|  | }  // namespace apache | 
|  |  | 
|  | #endif  // GEODE_CLIENTMETADATASERVICE_H_ |