| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ExecutionImpl.hpp" |
| |
| #include <sstream> |
| |
| #include <geode/DefaultResultCollector.hpp> |
| #include <geode/ExceptionTypes.hpp> |
| #include <geode/internal/geode_globals.hpp> |
| |
| #include "NoResult.hpp" |
| #include "TcrConnectionManager.hpp" |
| #include "ThinClientPoolDM.hpp" |
| #include "ThinClientRegion.hpp" |
| #include "UserAttributes.hpp" |
| #include "util/exception.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| FunctionToFunctionAttributes ExecutionImpl::m_func_attrs; |
| std::recursive_mutex ExecutionImpl::m_func_attrs_lock; |
| Execution ExecutionImpl::withFilter( |
| std::shared_ptr<CacheableVector> routingObj) { |
| if (routingObj == nullptr) { |
| throw IllegalArgumentException("Execution::withFilter: filter is null"); |
| } |
| if (m_region == nullptr) { |
| throw UnsupportedOperationException( |
| "Execution::withFilter: FunctionService::onRegion needs to be called " |
| "first before calling this function"); |
| } |
| // m_routingObj = routingObj; |
| return Execution(std::unique_ptr<ExecutionImpl>( |
| new ExecutionImpl(routingObj, m_args, m_rc, m_region, m_allServer, m_pool, |
| m_authenticatedView))); |
| } |
| |
| Execution ExecutionImpl::withArgs(std::shared_ptr<Cacheable> args) { |
| if (args == nullptr) { |
| throw IllegalArgumentException("Execution::withArgs: args is null"); |
| } |
| // m_args = args; |
| return Execution(std::unique_ptr<ExecutionImpl>( |
| new ExecutionImpl(m_routingObj, args, m_rc, m_region, m_allServer, m_pool, |
| m_authenticatedView))); |
| } |
| |
| Execution ExecutionImpl::withCollector(std::shared_ptr<ResultCollector> rs) { |
| if (rs == nullptr) { |
| throw IllegalArgumentException( |
| "Execution::withCollector: collector is null"); |
| } |
| // m_rc = rs; |
| return Execution(std::unique_ptr<ExecutionImpl>( |
| new ExecutionImpl(m_routingObj, m_args, rs, m_region, m_allServer, m_pool, |
| m_authenticatedView))); |
| } |
| |
| std::vector<int8_t>* ExecutionImpl::getFunctionAttributes( |
| const std::string& func) { |
| auto&& itr = m_func_attrs.find(func); |
| if (itr != m_func_attrs.end()) { |
| return itr->second; |
| } |
| return nullptr; |
| } |
| |
| std::shared_ptr<ResultCollector> ExecutionImpl::execute( |
| const std::shared_ptr<CacheableVector>& routingObj, |
| const std::shared_ptr<Cacheable>& args, |
| const std::shared_ptr<ResultCollector>& rs, const std::string& func, |
| std::chrono::milliseconds timeout) { |
| m_routingObj = routingObj; |
| m_args = args; |
| m_rc = rs; |
| return execute(func, timeout); |
| } |
| |
| std::shared_ptr<ResultCollector> ExecutionImpl::execute( |
| const std::string& func, std::chrono::milliseconds timeout) { |
| LOGDEBUG("ExecutionImpl::execute: "); |
| GuardUserAttributes gua; |
| if (m_authenticatedView != nullptr) { |
| LOGDEBUG("ExecutionImpl::execute function on authenticated cache"); |
| gua.setAuthenticatedView(m_authenticatedView); |
| } |
| bool serverHasResult = false; |
| bool serverIsHA = false; |
| bool serverOptimizeForWrite = false; |
| |
| auto&& attr = getFunctionAttributes(func); |
| { |
| if (attr == nullptr) { |
| std::lock_guard<decltype(m_func_attrs_lock)> _guard(m_func_attrs_lock); |
| GfErrType err = GF_NOERR; |
| attr = getFunctionAttributes(func); |
| if (attr == nullptr) { |
| if (m_region != nullptr) { |
| err = dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->getFuncAttributes(func, &attr); |
| } else if (m_pool != nullptr) { |
| err = getFuncAttributes(func, &attr); |
| } |
| if (err != GF_NOERR) { |
| GfErrTypeToException("Execute::GET_FUNCTION_ATTRIBUTES", err); |
| } |
| if (!attr->empty() && err == GF_NOERR) { |
| m_func_attrs[func] = attr; |
| } |
| } |
| } |
| } |
| serverHasResult = ((attr->at(0) == 1) ? true : false); |
| serverIsHA = ((attr->at(1) == 1) ? true : false); |
| serverOptimizeForWrite = ((attr->at(2) == 1) ? true : false); |
| |
| LOGDEBUG( |
| "ExecutionImpl::execute got functionAttributes from server for function " |
| "= %s serverHasResult = %d serverIsHA = %d serverOptimizeForWrite = %d ", |
| func.c_str(), serverHasResult, serverIsHA, serverOptimizeForWrite); |
| |
| if (serverHasResult == false) { |
| m_rc = std::make_shared<NoResult>(); |
| } else if (m_rc == nullptr) { |
| m_rc = std::make_shared<DefaultResultCollector>(); |
| } |
| |
| uint8_t isHAHasResultOptimizeForWrite = 0; |
| if (serverIsHA) { |
| isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 1; |
| } |
| |
| if (serverHasResult) { |
| isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 2; |
| } |
| |
| if (serverOptimizeForWrite) { |
| isHAHasResultOptimizeForWrite = isHAHasResultOptimizeForWrite | 4; |
| } |
| |
| LOGDEBUG("ExecutionImpl::execute: isHAHasResultOptimizeForWrite = %d", |
| isHAHasResultOptimizeForWrite); |
| TXState* txState = TSSTXStateWrapper::get().getTXState(); |
| |
| if (txState != nullptr && m_allServer == true) { |
| throw UnsupportedOperationException( |
| "Execution::execute: Transaction function execution on all servers is " |
| "not supported"); |
| } |
| |
| if (m_region != nullptr) { |
| int32_t retryAttempts = 3; |
| if (m_pool != nullptr) { |
| retryAttempts = m_pool->getRetryAttempts(); |
| } |
| |
| if (m_pool != nullptr && m_pool->getPRSingleHopEnabled()) { |
| auto tcrdm = std::dynamic_pointer_cast<ThinClientPoolDM>(m_pool); |
| if (!tcrdm) { |
| throw IllegalArgumentException( |
| "Execute: pool cast to ThinClientPoolDM failed"); |
| } |
| auto cms = tcrdm->getClientMetaDataService(); |
| auto failedNodes = CacheableHashSet::create(); |
| if ((!m_routingObj || m_routingObj->empty()) && |
| txState == nullptr) { // For transactions we should not create |
| // multiple threads |
| LOGDEBUG("ExecutionImpl::execute: m_routingObj is empty"); |
| auto serverToBucketsMap = cms->groupByServerToAllBuckets( |
| m_region, |
| /*serverOptimizeForWrite*/ (isHAHasResultOptimizeForWrite & 4) == |
| 4); |
| if (!serverToBucketsMap || serverToBucketsMap->empty()) { |
| LOGDEBUG( |
| "ExecutionImpl::execute: m_routingObj is empty and locationMap " |
| "is also empty so use old FE onRegion"); |
| std::dynamic_pointer_cast<ThinClientRegion>(m_region) |
| ->executeFunction( |
| func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, |
| m_rc, (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, |
| timeout); |
| cms->enqueueForMetadataRefresh(m_region->getFullPath(), 0); |
| } else { |
| // convert server to bucket map to server to key map where bucket id |
| // is key. |
| auto serverToKeysMap = |
| std::make_shared<ClientMetadataService::ServerToKeysMap>( |
| serverToBucketsMap->size()); |
| for (const auto& entry : *serverToBucketsMap) { |
| auto keys = std::make_shared<CacheableHashSet>( |
| static_cast<int32_t>(entry.second->size())); |
| for (const auto& bucket : *(entry.second)) { |
| keys->insert(CacheableInt32::create(bucket)); |
| } |
| serverToKeysMap->emplace(entry.first, keys); |
| } |
| LOGDEBUG( |
| "ExecutionImpl::execute: withoutFilter and locationMap is not " |
| "empty"); |
| bool reExecute = std::dynamic_pointer_cast<ThinClientRegion>(m_region) |
| ->executeFunctionSH( |
| func, m_args, isHAHasResultOptimizeForWrite, |
| m_rc, serverToKeysMap, failedNodes, timeout, |
| /*allBuckets*/ true); |
| if (reExecute) { // Fallback to old FE onREgion |
| if (isHAHasResultOptimizeForWrite & 1) { // isHA = true |
| m_rc->clearResults(); |
| auto rs = |
| std::dynamic_pointer_cast<ThinClientRegion>(m_region) |
| ->reExecuteFunction(func, m_args, m_routingObj, |
| isHAHasResultOptimizeForWrite, m_rc, |
| (isHAHasResultOptimizeForWrite & 1) |
| ? retryAttempts |
| : 0, |
| failedNodes, timeout); |
| } else { // isHA = false |
| m_rc->clearResults(); |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->executeFunction( |
| func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, |
| m_rc, |
| (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, |
| timeout); |
| } |
| } |
| } |
| } else if (m_routingObj != nullptr && m_routingObj->size() == 1) { |
| LOGDEBUG("executeFunction onRegion WithFilter size equal to 1 "); |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->executeFunction( |
| func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, m_rc, |
| (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, |
| timeout); |
| } else { |
| if (txState == nullptr) { |
| auto serverToKeysMap = cms->getServerToFilterMapFESHOP( |
| m_routingObj, m_region, /*serverOptimizeForWrite*/ |
| (isHAHasResultOptimizeForWrite & 4) == 4); |
| if (!serverToKeysMap || serverToKeysMap->empty()) { |
| LOGDEBUG( |
| "ExecutionImpl::execute: withFilter but locationMap is empty " |
| "so use old FE onRegion"); |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->executeFunction( |
| func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, |
| m_rc, |
| (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, |
| timeout); |
| cms->enqueueForMetadataRefresh(m_region->getFullPath(), 0); |
| } else { |
| LOGDEBUG( |
| "ExecutionImpl::execute: withFilter and locationMap is not " |
| "empty"); |
| bool reExecute = |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->executeFunctionSH(func, m_args, |
| isHAHasResultOptimizeForWrite, m_rc, |
| serverToKeysMap, failedNodes, timeout, |
| /*allBuckets*/ false); |
| if (reExecute) { // Fallback to old FE onREgion |
| if (isHAHasResultOptimizeForWrite & 1) { // isHA = true |
| m_rc->clearResults(); |
| auto rs = |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->reExecuteFunction(func, m_args, m_routingObj, |
| isHAHasResultOptimizeForWrite, m_rc, |
| (isHAHasResultOptimizeForWrite & 1) |
| ? retryAttempts |
| : 0, |
| failedNodes, timeout); |
| } else { // isHA = false |
| m_rc->clearResults(); |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->executeFunction( |
| func, m_args, m_routingObj, |
| isHAHasResultOptimizeForWrite, m_rc, |
| (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, |
| timeout); |
| } |
| } |
| } |
| } else { // For transactions use old way |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->executeFunction( |
| func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, |
| m_rc, (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, |
| timeout); |
| } |
| } |
| } else { // w/o single hop, Fallback to old FE onREgion |
| dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->executeFunction( |
| func, m_args, m_routingObj, isHAHasResultOptimizeForWrite, m_rc, |
| (isHAHasResultOptimizeForWrite & 1) ? retryAttempts : 0, timeout); |
| } |
| /* } catch (TransactionDataNodeHasDepartedException e) { |
| if(txState == nullptr) |
| { |
| GfErrTypeThrowException("Transaction is nullptr", |
| GF_CACHE_ILLEGAL_STATE_EXCEPTION); |
| } |
| |
| if(!txState->isReplay()) |
| txState->replay(false); |
| } catch(TransactionDataRebalancedException e) { |
| if(txState == nullptr) |
| { |
| GfErrTypeThrowException("Transaction is nullptr", |
| GF_CACHE_ILLEGAL_STATE_EXCEPTION); |
| } |
| |
| if(!txState->isReplay()) |
| txState->replay(true); |
| } |
| */ |
| if (serverHasResult == true) { |
| // ExecutionImpl::addResults(m_rc, rs); |
| m_rc->endResults(); |
| } |
| |
| return m_rc; |
| } else if (m_pool != nullptr) { |
| if (txState != nullptr) { |
| throw UnsupportedOperationException( |
| "Execution::execute: Transaction function execution on pool is not " |
| "supported"); |
| } |
| if (m_allServer == false) { |
| executeOnPool( |
| func, isHAHasResultOptimizeForWrite, |
| (isHAHasResultOptimizeForWrite & 1) ? m_pool->getRetryAttempts() : 0, |
| timeout); |
| if (serverHasResult == true) { |
| // ExecutionImpl::addResults(m_rc, rs); |
| m_rc->endResults(); |
| } |
| return m_rc; |
| } |
| executeOnAllServers(func, isHAHasResultOptimizeForWrite, timeout); |
| } else { |
| throw IllegalStateException("Execution::execute: should not be here"); |
| } |
| return m_rc; |
| } |
| |
| GfErrType ExecutionImpl::getFuncAttributes(const std::string& func, |
| std::vector<int8_t>** attr) { |
| ThinClientPoolDM* tcrdm = dynamic_cast<ThinClientPoolDM*>(m_pool.get()); |
| if (tcrdm == nullptr) { |
| throw IllegalArgumentException( |
| "Execute: pool cast to ThinClientPoolDM failed"); |
| } |
| |
| GfErrType err = GF_NOERR; |
| |
| // do TCR GET_FUNCTION_ATTRIBUTES |
| LOGDEBUG("Tcrmessage request GET_FUNCTION_ATTRIBUTES "); |
| TcrMessageGetFunctionAttributes request( |
| new DataOutput( |
| tcrdm->getConnectionManager().getCacheImpl()->createDataOutput()), |
| func, tcrdm); |
| TcrMessageReply reply(true, tcrdm); |
| err = tcrdm->sendSyncRequest(request, reply); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| *attr = reply.getFunctionAttributes(); |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = dynamic_cast<ThinClientRegion*>(m_region.get()) |
| ->handleServerException("Region::GET_FUNCTION_ATTRIBUTES", |
| reply.getException()); |
| break; |
| } |
| case TcrMessage::REQUEST_DATA_ERROR: { |
| LOGERROR("Error message from server: " + reply.getValue()->toString()); |
| throw FunctionExecutionException(reply.getValue()->toString()); |
| } |
| default: { |
| LOGERROR("Unknown message type %d while getting function attributes.", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| return err; |
| } |
| |
| void ExecutionImpl::addResults( |
| std::shared_ptr<ResultCollector>& collector, |
| const std::shared_ptr<CacheableVector>& results) { |
| if (results == nullptr || collector == nullptr) { |
| return; |
| } |
| |
| for (const auto& result : *results) { |
| collector->addResult(result); |
| } |
| } |
| |
| void ExecutionImpl::executeOnAllServers(const std::string& func, |
| uint8_t getResult, |
| std::chrono::milliseconds timeout) { |
| ThinClientPoolDM* tcrdm = dynamic_cast<ThinClientPoolDM*>(m_pool.get()); |
| if (tcrdm == nullptr) { |
| throw IllegalArgumentException( |
| "Execute: pool cast to ThinClientPoolDM failed"); |
| } |
| std::shared_ptr<CacheableString> exceptionPtr = nullptr; |
| GfErrType err = tcrdm->sendRequestToAllServers( |
| func.c_str(), getResult, timeout, m_args, m_rc, exceptionPtr); |
| if (exceptionPtr != nullptr && err != GF_NOERR) { |
| LOGDEBUG("Execute errorred: %d", err); |
| // throw FunctionExecutionException( "Execute: failed to execute function |
| // with server." ); |
| if (err == GF_CACHESERVER_EXCEPTION) { |
| throw FunctionExecutionException( |
| "Execute: failed to execute function with server."); |
| } else { |
| GfErrTypeToException("Execute", err); |
| } |
| } |
| |
| if (err == GF_AUTHENTICATION_FAILED_EXCEPTION || |
| err == GF_NOT_AUTHORIZED_EXCEPTION || |
| err == GF_AUTHENTICATION_REQUIRED_EXCEPTION) { |
| GfErrTypeToException("Execute", err); |
| } |
| |
| if (err != GF_NOERR) { |
| if (err == GF_CACHESERVER_EXCEPTION) { |
| auto message = std::string("Execute: exception at the server side: ") + |
| exceptionPtr->value().c_str(); |
| throw FunctionExecutionException(message); |
| } else { |
| LOGDEBUG("Execute errorred with server exception: %d", err); |
| throw FunctionExecutionException( |
| "Execute: failed to execute function on servers."); |
| } |
| } |
| } |
| std::shared_ptr<CacheableVector> ExecutionImpl::executeOnPool( |
| const std::string& func, uint8_t getResult, int32_t retryAttempts, |
| std::chrono::milliseconds timeout) { |
| ThinClientPoolDM* tcrdm = dynamic_cast<ThinClientPoolDM*>(m_pool.get()); |
| if (tcrdm == nullptr) { |
| throw IllegalArgumentException( |
| "Execute: pool cast to ThinClientPoolDM failed"); |
| } |
| int32_t attempt = 0; |
| |
| // auto csArray = tcrdm->getServers(); |
| |
| // if (csArray != nullptr && csArray->length() != 0) { |
| // for (int i = 0; i < csArray->length(); i++) |
| // { |
| // auto cs = csArray[i]; |
| // TcrEndpoint *ep = nullptr; |
| // /* |
| // std::string endpointStr = |
| // Utils::convertHostToCanonicalForm(cs->value().c_str() |
| // ); |
| // */ |
| // ep = tcrdm->addEP(cs->value().c_str()); |
| // } |
| //} |
| |
| // if pools retry attempts are not set then retry once on all available |
| // endpoints |
| if (retryAttempts == -1) { |
| retryAttempts = static_cast<int32_t>(tcrdm->getNumberOfEndPoints()); |
| } |
| |
| while (attempt <= retryAttempts) { |
| std::string funcName(func); |
| TcrMessageExecuteFunction msg( |
| new DataOutput( |
| tcrdm->getConnectionManager().getCacheImpl()->createDataOutput()), |
| funcName, m_args, getResult, tcrdm, timeout); |
| TcrMessageReply reply(true, tcrdm); |
| ChunkedFunctionExecutionResponse* resultCollector( |
| new ChunkedFunctionExecutionResponse(reply, (getResult & 2) == 2, |
| m_rc)); |
| reply.setChunkedResultHandler(resultCollector); |
| reply.setTimeout(timeout); |
| |
| GfErrType err = GF_NOERR; |
| err = tcrdm->sendSyncRequest(msg, reply, !(getResult & 1)); |
| LOGFINE("executeOnPool %d attempt = %d retryAttempts = %d", err, attempt, |
| retryAttempts); |
| if (err == GF_NOERR && |
| (reply.getMessageType() == TcrMessage::EXCEPTION || |
| reply.getMessageType() == TcrMessage::EXECUTE_FUNCTION_ERROR)) { |
| err = ThinClientRegion::handleServerException("Execute", |
| reply.getException()); |
| } |
| if (ThinClientBaseDM::isFatalClientError(err)) { |
| GfErrTypeToException("ExecuteOnPool:", err); |
| } else if (err != GF_NOERR) { |
| if (getResult & 1) { |
| resultCollector->reset(); |
| m_rc->clearResults(); |
| attempt++; |
| if (attempt > 0) { |
| getResult |= 8; // Send this on server, so that it can identify that |
| // it is a retry attempt. |
| } |
| continue; |
| } else { |
| GfErrTypeToException("ExecuteOnPool:", err); |
| } |
| } |
| // auto values = |
| // resultCollector->getFunctionExecutionResults(); |
| /* |
| ==25848== 1,610 (72 direct, 1,538 indirect) bytes in 2 blocks are definitely |
| lost in loss record 193 of 218 |
| ==25848== at 0x4007D75: operator new(unsigned int) |
| (vg_replace_malloc.c:313) |
| ==25848== by 0x42B575A: |
| apache::geode::client::ExecutionImpl::executeOnPool(stlp_std::basic_string<char, |
| stlp_std::char_traits<char>, stlp_std::allocator<char> >&, unsigned char, |
| int, unsigned int) (ExecutionImpl.cpp:426) |
| ==25848== by 0x42B65E6: |
| apache::geode::client::ExecutionImpl::execute(char const*, |
| bool, unsigned int, bool, bool, bool) (ExecutionImpl.cpp:292) |
| ==25848== by 0x42B7897: |
| apache::geode::client::ExecutionImpl::execute(char const*, |
| bool, unsigned int, bool, bool) (ExecutionImpl.cpp:76) |
| ==25848== by 0x8081320: Task_Client1OpTest::doTask() (in |
| /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPoolExecuteFunctionPrSHOP) |
| ==25848== by 0x808D5CA: dunit::TestSlave::begin() (in |
| /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPoolExecuteFunctionPrSHOP) |
| ==25848== by 0x8089807: dunit::dmain(int, char**) (in |
| /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPoolExecuteFunctionPrSHOP) |
| ==25848== by 0x805F9EA: main (in |
| /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/tests/cppcache/testThinClientPoolExecuteFunctionPrSHOP) |
| */ |
| delete resultCollector; |
| resultCollector = nullptr; |
| |
| return nullptr; |
| } |
| return nullptr; |
| } |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |