blob: c20915741d2cf433b0925064125a7380725bc0c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ThinClientStickyManager.hpp"
#include "ThinClientPoolDM.hpp"
namespace apache {
namespace geode {
namespace client {
bool ThinClientStickyManager::getStickyConnection(
TcrConnection*& conn, GfErrType* error,
std::set<ServerLocation>& excludeServers, bool forTransaction) {
bool maxConnLimit = false;
bool connFound = false;
conn = TssConnectionWrapper::get().getConnection();
if (!conn) {
conn =
m_dm->getConnectionFromQueue(true, error, excludeServers, maxConnLimit);
if (conn) {
conn->setAndGetBeingUsed(true, forTransaction);
}
} else {
if (!conn->setAndGetBeingUsed(
true, forTransaction)) { // manage connection thread is changing
// the connectiion
conn = m_dm->getConnectionFromQueue(true, error, excludeServers,
maxConnLimit);
if (conn) {
connFound = true;
conn->setAndGetBeingUsed(true, forTransaction);
}
} else {
connFound = true;
}
}
return connFound;
}
void ThinClientStickyManager::getSingleHopStickyConnection(
const TcrEndpoint& theEP, TcrConnection*& conn) {
conn = TssConnectionWrapper::get().getSHConnection(theEP, m_dm->getName());
}
void ThinClientStickyManager::addStickyConnection(TcrConnection* conn) {
std::lock_guard<decltype(m_stickyLock)> keysGuard(m_stickyLock);
if (auto oldConn = TssConnectionWrapper::get().getConnection()) {
const auto& it =
m_stickyConnList.find(TssConnectionWrapper::get().getConnDoublePtr());
if (it != m_stickyConnList.end()) {
oldConn->setAndGetBeingUsed(false, false);
m_stickyConnList.erase(it);
TssConnectionWrapper::get().setConnection(nullptr, nullptr);
m_dm->put(oldConn, false);
}
}
if (conn) {
TssConnectionWrapper::get().setConnection(conn, m_dm->shared_from_this());
conn->setAndGetBeingUsed(true, true); // this is done for transaction
// thread when some one resume
// transaction
m_stickyConnList.insert(TssConnectionWrapper::get().getConnDoublePtr());
}
}
void ThinClientStickyManager::setStickyConnection(TcrConnection* conn,
bool forTransaction) {
if (!conn) {
std::lock_guard<decltype(m_stickyLock)> keysGuard(m_stickyLock);
TssConnectionWrapper::get().setConnection(nullptr,
m_dm->shared_from_this());
} else {
auto currentConn = TssConnectionWrapper::get().getConnection();
if (currentConn != conn) {
// otherwsie no need to set it again
std::lock_guard<decltype(m_stickyLock)> keysGuard(m_stickyLock);
TssConnectionWrapper::get().setConnection(conn, m_dm->shared_from_this());
// if transaction then it will keep this as used
conn->setAndGetBeingUsed(false, forTransaction);
m_stickyConnList.insert(TssConnectionWrapper::get().getConnDoublePtr());
} else {
// if transaction then it will keep this as used
currentConn->setAndGetBeingUsed(false, forTransaction);
}
}
}
void ThinClientStickyManager::setSingleHopStickyConnection(
const TcrEndpoint& ep, TcrConnection* conn) {
TssConnectionWrapper::get().setSHConnection(ep, conn);
}
void ThinClientStickyManager::cleanStaleStickyConnection() {
LOGDEBUG("Cleaning sticky connections");
std::lock_guard<decltype(m_stickyLock)> keysGuard(m_stickyLock);
auto maxConnLimit = false;
std::set<ServerLocation> excludeServers;
for (auto it = m_stickyConnList.begin(); it != m_stickyConnList.end();) {
auto conn = (*it);
if (*conn) {
if ((*conn)->setAndGetBeingUsed(true, false) &&
canThisConnBeDeleted(*conn)) {
auto err = GF_NOERR;
if (auto temp = m_dm->getConnectionFromQueue(
false, &err, excludeServers, maxConnLimit)) {
auto temp1 = *conn;
//*conn = temp; instead of setting in thread local,
// put in queue, thread will come and pick it from there
*conn = nullptr;
m_dm->put(temp, false);
temp1->close();
_GEODE_SAFE_DELETE(temp1);
m_dm->removeEPConnections(1, false);
LOGDEBUG("Replaced a sticky connection");
} else {
(*conn)->setAndGetBeingUsed(false, false);
}
}
++it;
} else {
it = m_stickyConnList.erase(it);
}
}
}
void ThinClientStickyManager::closeAllStickyConnections() {
LOGDEBUG("ThinClientStickyManager::closeAllStickyConnections()");
std::lock_guard<decltype(m_stickyLock)> keysGuard(m_stickyLock);
for (const auto& tempConn : m_stickyConnList) {
if (*tempConn) {
(*tempConn)->close();
_GEODE_SAFE_DELETE(*tempConn);
m_dm->removeEPConnections(1, false);
}
}
}
bool ThinClientStickyManager::canThisConnBeDeleted(TcrConnection* conn) {
bool canBeDeleted = false;
LOGDEBUG("ThinClientStickyManager::canThisConnBeDeleted()");
std::lock_guard<decltype(m_stickyLock)> keysGuard(m_stickyLock);
if (m_dm->canItBeDeletedNoImpl(conn)) return true;
auto endPt = conn->getEndpointObject();
std::lock_guard<decltype(endPt->getQueueHostedMutex())> guardQueue(
endPt->getQueueHostedMutex());
if (endPt->isQueueHosted()) {
for (const auto& it : m_stickyConnList) {
auto connTemp = *it;
if (connTemp && connTemp->getEndpointObject() == endPt) {
canBeDeleted = true;
break;
}
}
}
return canBeDeleted;
}
void ThinClientStickyManager::releaseThreadLocalConnection() {
if (auto conn = TssConnectionWrapper::get().getConnection()) {
std::lock_guard<decltype(m_stickyLock)> keysGuard(m_stickyLock);
const auto& it =
m_stickyConnList.find(TssConnectionWrapper::get().getConnDoublePtr());
LOGDEBUG("ThinClientStickyManager::releaseThreadLocalConnection()");
if (it != m_stickyConnList.end()) {
m_stickyConnList.erase(it);
// now this can be used by next one
conn->setAndGetBeingUsed(false, false);
m_dm->put(conn, false);
}
TssConnectionWrapper::get().setConnection(nullptr,
m_dm->shared_from_this());
}
TssConnectionWrapper::get().releaseSHConnections(*m_dm);
}
void ThinClientStickyManager::getAnyConnection(TcrConnection*& conn) {
conn = TssConnectionWrapper::get().getAnyConnection(m_dm->getName());
}
} // namespace client
} // namespace geode
} // namespace apache