| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "ThinClientPoolStickyDM.hpp" |
| |
| #include <algorithm> |
| |
| #include "TssConnectionWrapper.hpp" |
| namespace apache { |
| namespace geode { |
| namespace client { |
| TcrConnection* ThinClientPoolStickyDM::getConnectionFromQueueW( |
| GfErrType* error, std::set<ServerLocation>& excludeServers, bool isBGThread, |
| TcrMessage& request, int8_t& version, bool& match, bool& connFound, |
| const std::shared_ptr<BucketServerLocation>& serverLocation) { |
| TcrConnection* conn = nullptr; |
| TcrEndpoint* ep = nullptr; |
| bool maxConnLimit = false; |
| if (isBGThread || request.getMessageType() == TcrMessage::GET_ALL_70 || |
| request.getMessageType() == TcrMessage::GET_ALL_WITH_CALLBACK) { |
| conn = ThinClientPoolDM::getConnectionFromQueueW( |
| error, excludeServers, isBGThread, request, version, match, connFound, |
| serverLocation); |
| return conn; |
| } |
| std::shared_ptr<BucketServerLocation> slTmp = nullptr; |
| if (m_attrs->getPRSingleHopEnabled() && !request.forTransaction()) { |
| if (serverLocation != nullptr) { |
| ep = getEndPoint(serverLocation, version, excludeServers); |
| } else if (request.forSingleHop()) { |
| ep = getSingleHopServer(request, version, slTmp, excludeServers); |
| } |
| if (ep != nullptr /*&& ep->connected()*/) { |
| // LOGINFO(" getSingleHopServer returns ep"); |
| m_manager->getSingleHopStickyConnection(*ep, conn); |
| if (!conn) { |
| conn = getFromEP(ep); |
| if (!conn) { |
| *error = |
| createPoolConnectionToAEndPoint(conn, ep, maxConnLimit, true); |
| if (*error == GF_CLIENT_WAIT_TIMEOUT || |
| *error == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) { |
| return nullptr; |
| } |
| } |
| if (!conn) { |
| m_manager->getAnyConnection(conn); |
| if (!conn) createPoolConnection(conn, excludeServers, maxConnLimit); |
| } |
| } |
| } else if (conn == nullptr) { |
| // LOGINFO(" ep is null"); |
| m_manager->getAnyConnection(conn); |
| if (!conn) { |
| conn = |
| getConnectionFromQueue(true, error, excludeServers, maxConnLimit); |
| } |
| /*if(!conn && maxConnLimit) |
| { |
| m_manager->getAnyConnection(conn); |
| }*/ |
| if (!conn) { |
| createPoolConnection(conn, excludeServers, maxConnLimit); |
| } |
| } |
| |
| if (maxConnLimit) { |
| // we reach max connection limit, found connection but endpoint is |
| // (not)different, no need to refresh pr-meta-data |
| connFound = true; |
| } else { |
| // if server hints pr-meta-data refresh then refresh |
| // anything else??? |
| } |
| |
| LOGDEBUG( |
| "ThinClientPoolStickyDM::getConnectionFromQueueW return conn = %p " |
| "match = %d connFound=%d", |
| conn, match, connFound); |
| return conn; |
| } |
| |
| bool cf = m_manager->getStickyConnection(conn, error, excludeServers, |
| request.forTransaction()); |
| |
| if (request.forTransaction()) { |
| auto txState = TSSTXStateWrapper::get().getTXState(); |
| if (*error == GF_NOERR && !cf && |
| (txState == nullptr || txState->isDirty())) { |
| *error = doFailover(conn); |
| } |
| |
| if (*error != GF_NOERR) { |
| return nullptr; |
| } |
| |
| if (txState != nullptr) { |
| txState->setDirty(); |
| } |
| } |
| return conn; |
| } |
| void ThinClientPoolStickyDM::putInQueue(TcrConnection* conn, bool isBGThread, |
| bool isTransaction) { |
| if (!isBGThread) { |
| if (m_attrs->getPRSingleHopEnabled() && !isTransaction) { |
| m_manager->setSingleHopStickyConnection(*conn->getEndpointObject(), conn); |
| } else { |
| m_manager->setStickyConnection(conn, isTransaction); |
| } |
| } else { |
| ThinClientPoolDM::putInQueue(conn, isBGThread, isTransaction); |
| } |
| } |
| void ThinClientPoolStickyDM::setStickyNull(bool isBGThread) { |
| if (!isBGThread && !m_attrs->getPRSingleHopEnabled()) { |
| m_manager->setStickyConnection(nullptr, false); |
| } |
| } |
| |
| void ThinClientPoolStickyDM::cleanStickyConnections( |
| std::atomic<bool>& isRunning) { |
| if (!isRunning) { |
| return; |
| } |
| m_manager->cleanStaleStickyConnection(); |
| } |
| |
| bool ThinClientPoolStickyDM::canItBeDeleted(TcrConnection* conn) { |
| return m_manager->canThisConnBeDeleted(conn); |
| } |
| void ThinClientPoolStickyDM::releaseThreadLocalConnection() { |
| m_manager->releaseThreadLocalConnection(); |
| } |
| void ThinClientPoolStickyDM::setThreadLocalConnection(TcrConnection* conn) { |
| m_manager->addStickyConnection(conn); |
| } |
| bool ThinClientPoolStickyDM::canItBeDeletedNoImpl(TcrConnection* conn) { |
| return ThinClientPoolDM::canItBeDeleted(conn); |
| } |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |