blob: e3d4b6a57ad56b4d7504a840d77cbdb5edfef914 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ThinClientPoolHADM.hpp"
#include <geode/SystemProperties.hpp>
#include "CacheImpl.hpp"
#include "FunctionExpiryTask.hpp"
#include "TcrConnectionManager.hpp"
#include "util/exception.hpp"
namespace apache {
namespace geode {
namespace client {
const char* ThinClientPoolHADM::NC_Redundancy = "NC Redundancy";
ThinClientPoolHADM::ThinClientPoolHADM(const char* name,
std::shared_ptr<PoolAttributes> poolAttr,
TcrConnectionManager& connManager)
: ThinClientPoolDM(name, poolAttr, connManager),
theTcrConnManager_(connManager),
redundancy_semaphore_(0),
redundancyTask_(nullptr),
server_monitor_task_id_(ExpiryTask::invalid()) {
redundancyManager_ = std::unique_ptr<ThinClientRedundancyManager>(
new ThinClientRedundancyManager(
&connManager, poolAttr->getSubscriptionRedundancy(), this));
}
void ThinClientPoolHADM::init() {
// Pool DM should only be inited once.
ThinClientPoolDM::init();
startBackgroundThreads();
}
void ThinClientPoolHADM::startBackgroundThreads() {
auto& props = m_connManager.getCacheImpl()
->getDistributedSystem()
.getSystemProperties();
redundancyManager_->initialize(m_attrs->getSubscriptionRedundancy());
// Call maintain redundancy level, so primary is available for notification
// operations.
GfErrType err = redundancyManager_->maintainRedundancyLevel(true);
const auto interval = props.redundancyMonitorInterval();
auto& manager = m_connManager.getCacheImpl()->getExpiryTaskManager();
auto task = std::make_shared<FunctionExpiryTask>(
manager, [this]() { redundancy_semaphore_.release(); });
server_monitor_task_id_ =
manager.schedule(std::move(task), std::chrono::seconds(1), interval);
LOGFINE(
"ThinClientPoolHADM::ThinClientPoolHADM Registered server "
"monitor task with id = %ld, interval = %ld",
server_monitor_task_id_, interval.count());
if (ThinClientBaseDM::isFatalClientError(err)) {
if (err == GF_CACHE_LOCATOR_EXCEPTION) {
LOGWARN(
"No locators were available during pool initialization with "
"subscription redundancy.");
} else {
throwExceptionIfError("ThinClientPoolHADM::init", err);
}
}
redundancyManager_->startPeriodicAck();
redundancyTask_ =
std::unique_ptr<Task<ThinClientPoolHADM>>(new Task<ThinClientPoolHADM>(
this, &ThinClientPoolHADM::redundancy, NC_Redundancy));
redundancyTask_->start();
}
GfErrType ThinClientPoolHADM::sendSyncRequest(TcrMessage& request,
TcrMessageReply& reply,
bool attemptFailover,
bool isBGThread) {
int32_t type = request.getMessageType();
if ((type == TcrMessage::EXECUTECQ_MSG_TYPE ||
type == TcrMessage::STOPCQ_MSG_TYPE ||
type == TcrMessage::CLOSECQ_MSG_TYPE ||
type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
type == TcrMessage::GETCQSTATS_MSG_TYPE ||
type == TcrMessage::MONITORCQ_MSG_TYPE ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
type == TcrMessage::GETDURABLECQS_MSG_TYPE)) {
if (m_destroyPending) return GF_NOERR;
reply.setDM(this);
return sendSyncRequestCq(request, reply);
} else {
return ThinClientPoolDM::sendSyncRequest(request, reply, attemptFailover,
isBGThread);
}
}
GfErrType ThinClientPoolHADM::sendSyncRequestRegisterInterestEP(
TcrMessage& request, TcrMessageReply& reply, bool attemptFailover,
TcrEndpoint* endpoint) {
return ThinClientBaseDM::sendSyncRequestRegisterInterest(
request, reply, attemptFailover, nullptr, endpoint);
}
GfErrType ThinClientPoolHADM::sendSyncRequestRegisterInterest(
TcrMessage& request, TcrMessageReply& reply, bool attemptFailover,
ThinClientRegion* region, TcrEndpoint* endpoint) {
return redundancyManager_->sendSyncRequestRegisterInterest(
request, reply, attemptFailover, endpoint, this, region);
}
GfErrType ThinClientPoolHADM::sendSyncRequestCq(TcrMessage& request,
TcrMessageReply& reply) {
return redundancyManager_->sendSyncRequestCq(request, reply, this);
}
bool ThinClientPoolHADM::preFailoverAction() { return true; }
bool ThinClientPoolHADM::postFailoverAction(TcrEndpoint*) {
m_connManager.triggerRedundancyThread();
return true;
}
void ThinClientPoolHADM::redundancy(std::atomic<bool>& isRunning) {
LOGFINE("ThinClientPoolHADM: Starting maintain redundancy thread.");
redundancy_semaphore_.acquire();
while (isRunning) {
if (!m_connManager.isNetDown()) {
redundancyManager_->maintainRedundancyLevel();
}
redundancy_semaphore_.acquire();
}
LOGFINE("ThinClientPoolHADM: Ending maintain redundancy thread.");
}
void ThinClientPoolHADM::destroy(bool keepAlive) {
LOGDEBUG("ThinClientPoolHADM::destroy");
if (!m_isDestroyed && !m_destroyPending) {
checkRegions();
if (m_remoteQueryServicePtr != nullptr) {
m_remoteQueryServicePtr->close();
m_remoteQueryServicePtr = nullptr;
}
stopPingThread();
sendNotificationCloseMsgs();
redundancyManager_->close();
m_destroyPendingHADM = true;
ThinClientPoolDM::destroy(keepAlive);
}
}
void ThinClientPoolHADM::sendNotificationCloseMsgs() {
if (redundancyTask_) {
auto& manager = m_connManager.getCacheImpl()->getExpiryTaskManager();
manager.cancel(server_monitor_task_id_);
redundancyTask_->stopNoblock();
redundancy_semaphore_.release();
redundancyTask_->wait();
redundancyTask_ = nullptr;
redundancyManager_->sendNotificationCloseMsgs();
}
}
GfErrType ThinClientPoolHADM::registerInterestAllRegions(
TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply) {
GfErrType err = GF_NOERR;
std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
for (const auto& region : regions_) {
auto opErr = region->registerKeys(ep, request, reply);
if (err == GF_NOERR) {
err = opErr;
}
}
return err;
}
bool ThinClientPoolHADM::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
return redundancyManager_->checkDupAndAdd(eventid);
}
void ThinClientPoolHADM::processMarker() {
// also set the static bool m_processedMarker for makePrimary messages
redundancyManager_->m_globalProcessedMarker = true;
}
void ThinClientPoolHADM::acquireRedundancyLock() {
redundancyManager_->acquireRedundancyLock();
}
void ThinClientPoolHADM::releaseRedundancyLock() {
redundancyManager_->releaseRedundancyLock();
}
std::recursive_mutex& ThinClientPoolHADM::getRedundancyLock() {
return redundancyManager_->getRedundancyLock();
}
GfErrType ThinClientPoolHADM::sendRequestToPrimary(TcrMessage& request,
TcrMessageReply& reply) {
return redundancyManager_->sendRequestToPrimary(request, reply);
}
bool ThinClientPoolHADM::isReadyForEvent() const {
return redundancyManager_->isSentReadyForEvents();
}
void ThinClientPoolHADM::addRegion(ThinClientRegion* theTCR) {
std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
regions_.push_back(theTCR);
}
void ThinClientPoolHADM::addDisconnectedMessageToQueue(
ThinClientRegion* theTCR) {
std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
if (redundancyManager_->allEndPointDiscon()) {
theTCR->receiveNotification(TcrMessageAllEndpointsDisconnectedMarker());
}
}
void ThinClientPoolHADM::removeRegion(ThinClientRegion* theTCR) {
std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
for (std::list<ThinClientRegion*>::iterator itr = regions_.begin();
itr != regions_.end(); itr++) {
if (*itr == theTCR) {
regions_.erase(itr);
return;
}
}
}
void ThinClientPoolHADM::readyForEvents() {
auto& sysProp = m_connManager.getCacheImpl()
->getDistributedSystem()
.getSystemProperties();
if (!sysProp.autoReadyForEvents()) {
init();
}
auto&& durable = sysProp.durableClientId();
if (!durable.empty()) {
redundancyManager_->readyForEvents();
}
}
void ThinClientPoolHADM::netDown() {
ThinClientPoolDM::netDown();
{
std::lock_guard<decltype(m_endpointsLock)> guard(m_endpointsLock);
for (auto&& currItr : m_endpoints) {
currItr.second->setConnectionStatus(false);
}
}
redundancyManager_->netDown();
}
void ThinClientPoolHADM::pingServerLocal() {
auto& mutex = redundancyManager_->getRedundancyLock();
std::lock_guard<decltype(mutex)> guard(mutex);
ThinClientPoolDM::pingServerLocal();
}
void ThinClientPoolHADM::removeCallbackConnection(TcrEndpoint* ep) {
redundancyManager_->removeCallbackConnection(ep);
}
void ThinClientPoolHADM::sendNotConnectedMessageToAllregions() {
std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
for (auto region : regions_) {
region->receiveNotification(TcrMessageAllEndpointsDisconnectedMarker());
}
}
void ThinClientPoolHADM::clearKeysOfInterestAllRegions() {
std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
for (auto region : regions_) {
region->clearKeysOfInterest();
}
}
std::shared_ptr<TcrEndpoint> ThinClientPoolHADM::createEP(
const char* endpointName) {
return std::make_shared<TcrPoolEndPoint>(
endpointName, m_connManager.getCacheImpl(),
m_connManager.failover_semaphore_, m_connManager.cleanup_semaphore_,
redundancy_semaphore_, this);
}
} // namespace client
} // namespace geode
} // namespace apache