blob: 3facc2cb24d749853f3689be61840da47a8bbfa5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ThinClientDistributionManager.hpp"
#include <algorithm>
#include <geode/AuthInitialize.hpp>
#include <geode/SystemProperties.hpp>
#include "DistributedSystemImpl.hpp"
#include "TcrConnectionManager.hpp"
#include "TcrEndpoint.hpp"
#include "ThinClientRegion.hpp"
#include "util/exception.hpp"
namespace apache {
namespace geode {
namespace client {
ThinClientDistributionManager::ThinClientDistributionManager(
TcrConnectionManager& connManager, ThinClientRegion* region)
: ThinClientBaseDM(connManager, region), m_activeEndpoint(-1) {}
void ThinClientDistributionManager::init() {
std::unordered_set<std::string> endpointNames;
getEndpointNames(endpointNames);
m_connManager.connect(this, m_endpoints, endpointNames);
int32_t numEndpoints = static_cast<int32_t>(m_endpoints.size());
std::vector<int> randIndex;
for (int index = 0; index < numEndpoints; ++index) {
randIndex.push_back(index);
}
RandGen randGen;
std::random_shuffle(randIndex.begin(), randIndex.end(), randGen);
int index = -1;
GfErrType err = GF_NOERR;
while (m_activeEndpoint < 0 && ++index < numEndpoints) {
m_endpoints[randIndex[index]]->setDM(this);
if ((err = m_endpoints[randIndex[index]]->registerDM(
m_clientNotification, false, true)) == GF_NOERR) {
m_activeEndpoint = randIndex[index];
LOGFINE("DM: Using endpoint %s",
m_endpoints[m_activeEndpoint]->name().c_str());
} else if (isFatalError(err)) {
m_connManager.disconnect(this, m_endpoints);
throwExceptionIfError("ThinClientDistributionManager::init", err);
}
}
ThinClientBaseDM::init();
m_initDone = true;
}
void ThinClientDistributionManager::destroy(bool keepAlive) {
if (!m_initDone) {
// nothing to be done
return;
}
DistManagersLockGuard _guard(m_connManager);
std::lock_guard<decltype(m_endpointsLock)> guard(m_endpointsLock);
if (m_activeEndpoint >= 0) {
m_endpoints[m_activeEndpoint]->unregisterDM(m_clientNotification, this);
}
LOGFINEST("ThinClientDistributionManager:: starting destroy for region %s",
(m_region != nullptr ? m_region->getFullPath().c_str() : "(null)"));
destroyAction();
// stop the chunk processing thread
stopChunkProcessor();
if (Log::enabled(LogLevel::Finest)) {
std::string endpointStr;
for (size_t index = 0; index < m_endpoints.size(); ++index) {
if (index != 0) {
endpointStr.append(",");
}
endpointStr.append(m_endpoints[index]->name());
}
LOGFINEST(
"ThinClientDistributionManager: disconnecting endpoints %s from TCCM",
endpointStr.c_str());
}
m_connManager.disconnect(this, m_endpoints, keepAlive);
LOGFINEST("ThinClientDistributionManager: completed destroy for region %s",
(m_region != nullptr ? m_region->getFullPath().c_str() : "(null)"));
m_initDone = false;
}
void ThinClientDistributionManager::destroyAction() {}
void ThinClientDistributionManager::getEndpointNames(
std::unordered_set<std::string>&) {}
bool ThinClientDistributionManager::isEndpointAttached(TcrEndpoint* ep) {
for (std::vector<TcrEndpoint*>::const_iterator iter = m_endpoints.begin();
iter != m_endpoints.end(); ++iter) {
if (*iter == ep) {
return true;
}
}
return false;
}
GfErrType ThinClientDistributionManager::sendSyncRequest(TcrMessage& request,
TcrMessageReply& reply,
bool attemptFailover,
bool) {
GfErrType error = GF_NOTCON;
bool useActiveEndpoint = true;
request.setDM(this);
reply.setDM(this);
if (request.getMessageType() == TcrMessage::GET_ALL_70 ||
request.getMessageType() == TcrMessage::GET_ALL_WITH_CALLBACK) {
request.InitializeGetallMsg(
request.getCallbackArgument()); // now initialize getall msg
}
int currentEndpoint = m_activeEndpoint;
if (currentEndpoint >= 0 && m_endpoints[currentEndpoint]->connected()) {
LOGDEBUG(
"ThinClientDistributionManager::sendSyncRequest: trying to send on "
"endpoint: %s",
m_endpoints[currentEndpoint]->name().c_str());
error = sendRequestToEP(request, reply, m_endpoints[currentEndpoint]);
useActiveEndpoint = false;
LOGDEBUG(
"ThinClientDistributionManager::sendSyncRequest: completed send on "
"endpoint: %s [error:%d]",
m_endpoints[currentEndpoint]->name().c_str(), error);
}
if (!attemptFailover || error == GF_NOERR) {
return error;
}
bool doRand = true;
std::vector<int> randIndex;
int32_t type = request.getMessageType();
bool forceSelect = false;
// we need to forceSelect because endpoint connection status
// is not set to false (in tcrendpoint::send) for a query or putall timeout
if ((type == TcrMessage::QUERY || type == TcrMessage::QUERY_WITH_PARAMETERS ||
type == TcrMessage::PUTALL ||
type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
error == GF_TIMEOUT) {
forceSelect = true;
}
if (!isFatalError(error)) {
std::lock_guard<decltype(m_endpointsLock)> guard(m_endpointsLock);
GfErrType connErr = GF_NOERR;
while (error != GF_NOERR && !isFatalError(error) &&
(connErr = selectEndpoint(randIndex, doRand, useActiveEndpoint,
forceSelect)) == GF_NOERR) {
// if it's a query or putall and we had a timeout, just return with the
// newly
// selected endpoint without failover-retry
if ((type == TcrMessage::QUERY ||
type == TcrMessage::QUERY_WITH_PARAMETERS ||
type == TcrMessage::PUTALL ||
type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
error == GF_TIMEOUT) {
return error;
}
currentEndpoint = m_activeEndpoint;
LOGFINEST(
"ThinClientDistributionManager::sendSyncRequest: trying send on new "
"endpoint %s",
m_endpoints[currentEndpoint]->name().c_str());
error = sendRequestToEP(request, reply, m_endpoints[currentEndpoint]);
if (error != GF_NOERR) {
LOGFINE(
"ThinClientDistributionManager::sendSyncRequest: failed send on "
"new endpoint %s for message "
"type %d [error:%d]",
m_endpoints[currentEndpoint]->name().c_str(),
request.getMessageType(), error);
} else {
LOGFINEST(
"ThinClientDistributionManager::sendSyncRequest: completed send on "
"new endpoint: %s",
m_endpoints[currentEndpoint]->name().c_str());
}
useActiveEndpoint = false;
}
// : Top-level only sees NotConnectedException or TimeoutException
if ((error == GF_NOERR && connErr != GF_NOERR) || error == GF_IOERR) {
error = GF_NOTCON;
}
}
return error;
}
void ThinClientDistributionManager::failover() {
std::vector<int> randIndex;
bool doRand = true;
LOGFINEST("DM: invoked select endpoint via failover thread for region %s",
(m_region != nullptr ? m_region->getFullPath().c_str() : "(null)"));
selectEndpoint(randIndex, doRand);
}
inline GfErrType ThinClientDistributionManager::connectToEndpoint(int epIndex) {
GfErrType err = GF_NOERR;
TcrEndpoint* ep = m_endpoints[epIndex];
ep->setDM(this);
if ((err = ep->registerDM(m_clientNotification, false, true, this)) ==
GF_NOERR) {
LOGFINE("DM: Attempting failover to endpoint %s", ep->name().c_str());
if (postFailoverAction(ep)) {
m_activeEndpoint = epIndex;
LOGFINE("DM: Failover to endpoint %s complete.", ep->name().c_str());
} else {
LOGFINE("DM: Post failover action failed for endpoint %s",
ep->name().c_str());
ep->unregisterDM(m_clientNotification, this);
err = GF_EUNDEF;
}
} else {
LOGFINE("DM: Could not connect to endpoint %s", ep->name().c_str());
}
return err;
}
GfErrType ThinClientDistributionManager::selectEndpoint(
std::vector<int>& randIndex, bool& doRand, bool useActiveEndpoint,
bool forceSelect) {
// Preconditions
// 1. Number of endpoints on which DM is registered <= 1
GfErrType err = GF_NOERR;
int currentEndpoint = m_activeEndpoint;
if (currentEndpoint < 0 || !m_endpoints[currentEndpoint]->connected() ||
forceSelect) {
std::lock_guard<decltype(m_endpointsLock)> guard(m_endpointsLock);
if (m_activeEndpoint < 0 || !m_endpoints[m_activeEndpoint]->connected() ||
forceSelect) { // double check
currentEndpoint = m_activeEndpoint;
if (m_activeEndpoint >= 0) {
m_activeEndpoint = -1;
m_endpoints[currentEndpoint]->unregisterDM(m_clientNotification, this);
postUnregisterAction();
}
if (!preFailoverAction()) {
return GF_NOERR;
}
int32_t numEndpoints = static_cast<int32_t>(m_endpoints.size());
err = GF_EUNDEF;
if (doRand) {
RandGen randGen;
for (int idx = 0; idx < numEndpoints; idx++) {
if (useActiveEndpoint || idx != currentEndpoint) {
randIndex.push_back(idx);
}
}
std::random_shuffle(randIndex.begin(), randIndex.end(), randGen);
doRand = false;
}
while (!randIndex.empty()) {
int currIndex = randIndex[0];
randIndex.erase(randIndex.begin());
if ((err = connectToEndpoint(currIndex)) == GF_NOERR) {
LOGFINER("TCDM::selectEndpoint: Successfully selected endpoint %s",
m_endpoints[currIndex]->name().c_str());
break;
}
}
}
}
// Postconditions:
// 1. If initial size of randIndex > 0 && failover was attempted, final size
// of randIndex < initial size of randIndex
// 2. If CONN_NOERR, then m_activeEndpoint > -1, m_activeEndpoint should be
// connected.
// 3. Number of endpoints on which DM is registered <= 1
return err;
}
void ThinClientDistributionManager::postUnregisterAction() {}
bool ThinClientDistributionManager::preFailoverAction() { return true; }
bool ThinClientDistributionManager::postFailoverAction(TcrEndpoint*) {
return true;
}
std::shared_ptr<Properties> ThinClientDistributionManager::getCredentials(
TcrEndpoint* ep) {
auto cacheImpl = m_connManager.getCacheImpl();
const auto& distributedSystem = cacheImpl->getDistributedSystem();
const auto& tmpSecurityProperties =
distributedSystem.getSystemProperties().getSecurityProperties();
if (const auto& authInitialize = cacheImpl->getAuthInitialize()) {
LOGFINER(
"ThinClientDistributionManager::getCredentials: acquired handle to "
"authLoader, "
"invoking getCredentials %s",
ep->name().c_str());
const auto& tmpAuthIniSecurityProperties = authInitialize->getCredentials(
tmpSecurityProperties, ep->name().c_str());
LOGFINER("Done getting credentials");
return tmpAuthIniSecurityProperties;
}
return nullptr;
}
GfErrType ThinClientDistributionManager::sendUserCredentials(
std::shared_ptr<Properties> credentials, TcrEndpoint* ep) {
LOGDEBUG("ThinClientPoolDM::sendUserCredentials");
GfErrType err = GF_NOERR;
TcrMessageUserCredential request(
new DataOutput(m_connManager.getCacheImpl()->createDataOutput()),
credentials, this);
TcrMessageReply reply(true, this);
err = ep->send(request, reply);
LOGDEBUG(
"ThinClientDistributionManager::sendUserCredentials: completed endpoint "
"send for: %s [error:%d]",
ep->name().c_str(), err);
err = handleEPError(ep, reply, err);
if (err == GF_IOERR) {
err = GF_NOTCON;
}
if (err == GF_NOERR) {
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
// nothing to be done;
break;
}
case TcrMessage::EXCEPTION: {
err = ThinClientRegion::handleServerException(
"ThinClientDistributionManager::sendUserCredentials AuthException",
reply.getException());
break;
}
default: {
LOGERROR(
"Unknown message type %d during secure response, possible "
"serialization mismatch",
reply.getMessageType());
err = GF_MSG;
break;
}
}
// throw exception if it is not authenticated
// throwExceptionIfError("ThinClientDistributionManager::sendUserCredentials",
// err);
}
return err;
}
GfErrType ThinClientDistributionManager::sendRequestToEP(
const TcrMessage& request, TcrMessageReply& reply, TcrEndpoint* ep) {
LOGDEBUG(
"ThinClientDistributionManager::sendRequestToEP: invoking endpoint send "
"for: %s",
ep->name().c_str());
// retry for auth n times.
// check if auth requie excep
// if then then sendUserCreds message, if success
// then send back original message
int numberOftimesAuthTried = 3;
if (!isSecurityOn()) numberOftimesAuthTried = 1;
GfErrType error = GF_NOERR;
while (numberOftimesAuthTried > 0) {
ep->setDM(this); // we are setting here before making request...
if (isSecurityOn() && !ep->isAuthenticated()) {
numberOftimesAuthTried--;
error = this->sendUserCredentials(this->getCredentials(ep), ep);
} else {
numberOftimesAuthTried--;
}
if (error != GF_NOERR) return error;
reply.setDM(this);
error = ep->send(request, reply);
LOGDEBUG(
"ThinClientDistributionManager::sendRequestToEP: completed endpoint "
"send for: %s [error:%d]",
ep->name().c_str(), error);
error = handleEPError(ep, reply, error);
if (error == GF_IOERR) {
error = GF_NOTCON;
}
if (isSecurityOn() && error == GF_NOERR &&
isAuthRequireException(reply.getException())) {
ep->setAuthenticated(false);
continue;
}
return error;
}
return error;
}
} // namespace client
} // namespace geode
} // namespace apache