blob: e6c91e98f0348ac5dc886942bb9ed496f196bd71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef GEODE_CLIENTMETADATASERVICE_H_
#define GEODE_CLIENTMETADATASERVICE_H_
#include <memory>
#include <string>
#include <unordered_map>
#include <ace/Semaphore.h>
#include <ace/Task.h>
#include <geode/CacheableKey.hpp>
#include <geode/Region.hpp>
#include <geode/Serializable.hpp>
#include <geode/internal/functional.hpp>
#include "BucketServerLocation.hpp"
#include "ClientMetadata.hpp"
#include "DistributedSystemImpl.hpp"
#include "NonCopyable.hpp"
#include "Queue.hpp"
#include "ServerLocation.hpp"
namespace apache {
namespace geode {
namespace client {
class ClienMetadata;
typedef std::map<std::string, std::shared_ptr<ClientMetadata>>
RegionMetadataMapType;
class BucketStatus {
private:
ACE_Time_Value m_lastTimeout;
public:
BucketStatus() : m_lastTimeout(ACE_Time_Value::zero) {}
bool isTimedoutAndReset(std::chrono::milliseconds millis) {
if (m_lastTimeout == ACE_Time_Value::zero) {
return false;
} else {
ACE_Time_Value to(millis);
to += m_lastTimeout;
if (to > ACE_OS::gettimeofday()) {
return true; // timeout as buckste not recovered yet
} else {
// reset to zero as we waited enough to recover bucket
m_lastTimeout = ACE_Time_Value::zero;
return false;
}
}
}
void setTimeout() {
if (m_lastTimeout == ACE_Time_Value::zero) {
m_lastTimeout = ACE_OS::gettimeofday(); // set once only for timeout
}
}
};
class PRbuckets {
private:
BucketStatus* m_buckets;
public:
explicit PRbuckets(int32_t nBuckets) {
m_buckets = new BucketStatus[nBuckets];
}
~PRbuckets() { delete[] m_buckets; }
bool isBucketTimedOut(int32_t bucketId, std::chrono::milliseconds millis) {
return m_buckets[bucketId].isTimedoutAndReset(millis);
}
void setBucketTimeout(int32_t bucketId) { m_buckets[bucketId].setTimeout(); }
};
class ClientMetadataService : public ACE_Task_Base,
private NonCopyable,
private NonAssignable {
public:
~ClientMetadataService();
explicit ClientMetadataService(Pool* pool);
inline void start() {
m_run = true;
this->activate();
}
inline void stop() {
m_run = false;
m_regionQueueSema.release();
this->wait();
}
int svc(void);
void getClientPRMetadata(const char* regionFullPath);
void getBucketServerLocation(
const std::shared_ptr<Region>& region,
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument, bool isPrimary,
std::shared_ptr<BucketServerLocation>& serverLocation, int8_t& version);
void removeBucketServerLocation(BucketServerLocation serverLocation);
std::shared_ptr<ClientMetadata> getClientMetadata(
const std::string& regionFullPath);
void populateDummyServers(const char* regionName,
std::shared_ptr<ClientMetadata> clientmetadata);
void enqueueForMetadataRefresh(const std::string& regionFullPath,
int8_t serverGroupFlag);
typedef std::unordered_map<
std::shared_ptr<BucketServerLocation>,
std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>,
dereference_hash<std::shared_ptr<BucketServerLocation>>,
dereference_equal_to<std::shared_ptr<BucketServerLocation>>>
ServerToFilterMap;
std::shared_ptr<ServerToFilterMap> getServerToFilterMap(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
const std::shared_ptr<Region>& region, bool isPrimary);
void markPrimaryBucketForTimeout(
const std::shared_ptr<Region>& region,
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument, bool isPrimary,
std::shared_ptr<BucketServerLocation>& serverLocation, int8_t& version);
void markPrimaryBucketForTimeoutButLookSecondaryBucket(
const std::shared_ptr<Region>& region,
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument, bool isPrimary,
std::shared_ptr<BucketServerLocation>& serverLocation, int8_t& version);
bool isBucketMarkedForTimeout(const char* regionFullPath, int32_t bucketid);
typedef std::unordered_set<int32_t> BucketSet;
typedef std::unordered_map<
std::shared_ptr<BucketServerLocation>, std::shared_ptr<BucketSet>,
dereference_hash<std::shared_ptr<BucketServerLocation>>,
dereference_equal_to<std::shared_ptr<BucketServerLocation>>>
ServerToBucketsMap;
// bool AreBucketSetsEqual(const BucketSet& currentBucketSet,
// const BucketSet& bucketSet);
std::shared_ptr<BucketServerLocation> findNextServer(
const ServerToBucketsMap& serverToBucketsMap,
const BucketSet& currentBucketSet);
typedef std::unordered_map<int32_t, std::shared_ptr<CacheableHashSet>>
BucketToKeysMap;
std::shared_ptr<BucketToKeysMap> groupByBucketOnClientSide(
const std::shared_ptr<Region>& region,
const std::shared_ptr<CacheableVector>& keySet,
const std::shared_ptr<ClientMetadata>& metadata);
typedef std::unordered_map<
std::shared_ptr<BucketServerLocation>, std::shared_ptr<CacheableHashSet>,
dereference_hash<std::shared_ptr<BucketServerLocation>>,
dereference_equal_to<std::shared_ptr<BucketServerLocation>>>
ServerToKeysMap;
std::shared_ptr<ServerToKeysMap> getServerToFilterMapFESHOP(
const std::shared_ptr<CacheableVector>& keySet,
const std::shared_ptr<Region>& region, bool isPrimary);
std::shared_ptr<ClientMetadataService::ServerToBucketsMap>
groupByServerToAllBuckets(const std::shared_ptr<Region>& region,
bool optimizeForWrite);
std::shared_ptr<ClientMetadataService::ServerToBucketsMap>
groupByServerToBuckets(const std::shared_ptr<ClientMetadata>& metadata,
const BucketSet& bucketSet, bool optimizeForWrite);
std::shared_ptr<ClientMetadataService::ServerToBucketsMap> pruneNodes(
const std::shared_ptr<ClientMetadata>& metadata,
const BucketSet& buckets);
private:
// const std::shared_ptr<PartitionResolver>& getResolver(const
// std::shared_ptr<Region>& region, const std::shared_ptr<CacheableKey>& key,
// const std::shared_ptr<Serializable>& aCallbackArgument);
// BucketServerLocation getServerLocation(std::shared_ptr<ClientMetadata>
// cptr, int bucketId, bool isPrimary);
std::shared_ptr<ClientMetadata> SendClientPRMetadata(
const char* regionPath, std::shared_ptr<ClientMetadata> cptr);
std::shared_ptr<ClientMetadata> getClientMetadata(
const std::shared_ptr<Region>& region);
private:
// ACE_Recursive_Thread_Mutex m_regionMetadataLock;
ACE_RW_Thread_Mutex m_regionMetadataLock;
ClientMetadataService();
ACE_Semaphore m_regionQueueSema;
RegionMetadataMapType m_regionMetaDataMap;
volatile bool m_run;
Pool* m_pool;
Queue<std::shared_ptr<std::string>> m_regionQueue;
ACE_RW_Thread_Mutex m_PRbucketStatusLock;
std::map<std::string, PRbuckets*> m_bucketStatus;
std::chrono::milliseconds m_bucketWaitTimeout;
static const char* NC_CMDSvcThread;
};
} // namespace client
} // namespace geode
} // namespace apache
#endif // GEODE_CLIENTMETADATASERVICE_H_