| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "RemoteQuery.hpp" |
| |
| #include <boost/thread/lock_types.hpp> |
| |
| #include "ResultSetImpl.hpp" |
| #include "StructSetImpl.hpp" |
| #include "TcrConnectionManager.hpp" |
| #include "ThinClientPoolDM.hpp" |
| #include "ThinClientRegion.hpp" |
| #include "UserAttributes.hpp" |
| #include "util/bounds.hpp" |
| #include "util/exception.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| RemoteQuery::RemoteQuery( |
| std::string querystr, |
| const std::shared_ptr<RemoteQueryService>& queryService, |
| ThinClientBaseDM* tccdmptr, AuthenticatedView* authenticatedView) |
| : m_queryString(querystr), |
| m_queryService(queryService), |
| m_tccdm(tccdmptr), |
| m_authenticatedView(authenticatedView) { |
| LOGFINEST("RemoteQuery: created a new query: " + querystr); |
| } |
| |
| std::shared_ptr<SelectResults> RemoteQuery::execute( |
| std::chrono::milliseconds timeout) { |
| util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout); |
| GuardUserAttributes gua; |
| if (m_authenticatedView != nullptr) { |
| gua.setAuthenticatedView(m_authenticatedView); |
| } |
| return execute(timeout, "Query::execute", m_tccdm, nullptr); |
| } |
| |
| std::shared_ptr<SelectResults> RemoteQuery::execute( |
| std::shared_ptr<CacheableVector> paramList, |
| std::chrono::milliseconds timeout) { |
| util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout); |
| GuardUserAttributes gua; |
| if (m_authenticatedView) { |
| gua.setAuthenticatedView(m_authenticatedView); |
| } |
| return execute(timeout, "Query::execute", m_tccdm, paramList); |
| } |
| |
| std::shared_ptr<SelectResults> RemoteQuery::execute( |
| std::chrono::milliseconds timeout, const char* func, ThinClientBaseDM* tcdm, |
| std::shared_ptr<CacheableVector> paramList) { |
| auto pool = dynamic_cast<ThinClientPoolDM*>(tcdm); |
| if (pool) { |
| pool->getStats().incQueryExecutionId(); |
| } |
| /*get the start time for QueryExecutionTime stat*/ |
| // NOLINTNEXTLINE(clang-analyzer-core.CallAndMessage) |
| bool enableTimeStatistics = tcdm->getConnectionManager() |
| .getCacheImpl() |
| ->getDistributedSystem() |
| .getSystemProperties() |
| .getEnableTimeStatistics(); |
| int64_t sampleStartNanos = |
| enableTimeStatistics ? Utils::startStatOpTime() : 0; |
| TcrMessageReply reply(true, tcdm); |
| auto resultCollector = |
| std::unique_ptr<ChunkedQueryResponse>(new ChunkedQueryResponse(reply)); |
| reply.setChunkedResultHandler( |
| static_cast<TcrChunkedResult*>(resultCollector.get())); |
| GfErrType err = executeNoThrow(timeout, reply, func, tcdm, paramList); |
| throwExceptionIfError(func, err); |
| |
| std::shared_ptr<SelectResults> sr; |
| |
| LOGFINEST("%s: reading reply for query: %s", func, m_queryString.c_str()); |
| auto&& values = resultCollector->getQueryResults(); |
| auto&& fieldNameVec = resultCollector->getStructFieldNames(); |
| size_t sizeOfFieldNamesVec = fieldNameVec.size(); |
| if (sizeOfFieldNamesVec == 0) { |
| LOGFINEST("%s: creating ResultSet for query: %s", func, |
| m_queryString.c_str()); |
| sr = std::make_shared<ResultSetImpl>(values); |
| } else { |
| if (values->size() % fieldNameVec.size() != 0) { |
| throw MessageException(std::string(func) + |
| ": Number of values coming from server has to be " |
| "exactly divisible by field count"); |
| } else { |
| LOGFINEST("%s: creating StructSet for query: %s", func, |
| m_queryString.c_str()); |
| sr = std::make_shared<StructSetImpl>(values, fieldNameVec); |
| } |
| } |
| |
| /*update QueryExecutionTime stat */ |
| if (pool && enableTimeStatistics) { |
| Utils::updateStatOpTime(pool->getStats().getStats(), |
| pool->getStats().getQueryExecutionTimeId(), |
| sampleStartNanos); |
| } |
| return sr; |
| } |
| |
| GfErrType RemoteQuery::executeNoThrow( |
| std::chrono::milliseconds timeout, TcrMessageReply& reply, const char* func, |
| ThinClientBaseDM* tcdm, std::shared_ptr<CacheableVector> paramList) { |
| LOGFINEST("%s: executing query: %s", func, m_queryString.c_str()); |
| |
| boost::shared_lock<boost::shared_mutex> guard{m_queryService->getMutex()}; |
| if (m_queryService->invalid()) { |
| return GF_CACHE_CLOSED_EXCEPTION; |
| } |
| |
| LOGDEBUG("%s: creating QUERY TcrMessage for query: %s", func, |
| m_queryString.c_str()); |
| if (paramList != nullptr) { |
| // QUERY_WITH_PARAMETERS |
| TcrMessageQueryWithParameters msg( |
| new DataOutput( |
| m_tccdm->getConnectionManager().getCacheImpl()->createDataOutput()), |
| m_queryString, nullptr, paramList, timeout, tcdm); |
| msg.setTimeout(timeout); |
| reply.setTimeout(timeout); |
| |
| LOGFINEST("%s: sending request for query: %s", func, m_queryString.c_str()); |
| if (tcdm == nullptr) { |
| tcdm = m_tccdm; |
| } |
| auto err = tcdm->sendSyncRequest(msg, reply); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| err = ThinClientRegion::handleServerException(func, reply.getException()); |
| if (err == GF_CACHESERVER_EXCEPTION) { |
| err = GF_REMOTE_QUERY_EXCEPTION; |
| } |
| } |
| return err; |
| } else { |
| TcrMessageQuery msg( |
| new DataOutput( |
| m_tccdm->getConnectionManager().getCacheImpl()->createDataOutput()), |
| m_queryString, timeout, tcdm); |
| msg.setTimeout(timeout); |
| reply.setTimeout(timeout); |
| |
| LOGFINEST("%s: sending request for query: %s", func, m_queryString.c_str()); |
| if (tcdm == nullptr) { |
| tcdm = m_tccdm; |
| } |
| auto err = tcdm->sendSyncRequest(msg, reply); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| err = ThinClientRegion::handleServerException(func, reply.getException()); |
| if (err == GF_CACHESERVER_EXCEPTION) { |
| err = GF_REMOTE_QUERY_EXCEPTION; |
| } |
| } |
| return err; |
| } |
| } |
| |
| const std::string& RemoteQuery::getQueryString() const { return m_queryString; } |
| |
| void RemoteQuery::compile() { |
| throw UnsupportedOperationException("Not supported in native clients"); |
| } |
| |
| bool RemoteQuery::isCompiled() { |
| throw UnsupportedOperationException("Not supported in native clients"); |
| } |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |