| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "TcrConnection.hpp" |
| |
| #include <cinttypes> |
| |
| #include <ace/INET_Addr.h> |
| #include <ace/OS.h> |
| |
| #include <geode/AuthInitialize.hpp> |
| #include <geode/SystemProperties.hpp> |
| |
| #include "ClientProxyMembershipID.hpp" |
| #include "Connector.hpp" |
| #include "DistributedSystemImpl.hpp" |
| #include "TcpSslConn.hpp" |
| #include "TcrConnectionManager.hpp" |
| #include "TcrEndpoint.hpp" |
| #include "ThinClientPoolHADM.hpp" |
| #include "ThinClientRegion.hpp" |
| #include "Utils.hpp" |
| #include "Version.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| const int HEADER_LENGTH = 17; |
| const int CHUNK_HEADER_LENGTH = 5; |
| const int8_t LAST_CHUNK_MASK = 0x1; |
| const int64_t INITIAL_CONNECTION_ID = 26739; |
| |
| #define throwException(ex) \ |
| do { \ |
| LOGFINEST(ex.getName() + ": " + ex.getMessage()); \ |
| throw ex; \ |
| } while (0) |
| |
| struct FinalizeProcessChunk { |
| private: |
| TcrMessage& m_reply; |
| uint16_t m_endpointMemId; |
| |
| public: |
| FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointMemId) |
| : m_reply(reply), m_endpointMemId(endpointMemId) {} |
| ~FinalizeProcessChunk() noexcept(false) { |
| // Enqueue a nullptr chunk indicating a wait for processing to complete. |
| m_reply.processChunk(std::vector<uint8_t>(), 0, m_endpointMemId); |
| } |
| }; |
| |
| bool TcrConnection::initTcrConnection( |
| TcrEndpoint* endpointObj, const char* endpoint, |
| synchronized_set<std::unordered_set<uint16_t>>& ports, |
| bool isClientNotification, bool isSecondary, |
| std::chrono::microseconds connectTimeout) { |
| m_conn = nullptr; |
| m_endpointObj = endpointObj; |
| m_poolDM = dynamic_cast<ThinClientPoolDM*>(m_endpointObj->getPoolHADM()); |
| // add to the connection reference counter of the endpoint |
| m_endpointObj->addConnRefCounter(1); |
| // m_connected = isConnected; |
| m_hasServerQueue = NON_REDUNDANT_SERVER; |
| m_queueSize = 0; |
| // m_chunksProcessSema = 0; |
| m_creationTime = clock::now(); |
| connectionId = INITIAL_CONNECTION_ID; |
| m_lastAccessed = clock::now(); |
| auto cacheImpl = m_poolDM->getConnectionManager().getCacheImpl(); |
| const auto& distributedSystem = cacheImpl->getDistributedSystem(); |
| const auto& sysProp = distributedSystem.getSystemProperties(); |
| |
| LOGDEBUG( |
| "Tcrconnection const isSecondary = %d and isClientNotification = %d, " |
| "this = %p, conn ref to endopint %d", |
| isSecondary, isClientNotification, this, |
| m_endpointObj->getConnRefCounter()); |
| bool isPool = false; |
| m_isBeingUsed = false; |
| m_endpoint = endpoint; |
| // Precondition: |
| // 1. isSecondary ==> isClientNotification |
| |
| // Create TcpConn object which manages a socket connection with the endpoint. |
| if (endpointObj && endpointObj->getPoolHADM()) { |
| m_conn = createConnection( |
| m_endpoint, connectTimeout, |
| static_cast<int32_t>( |
| endpointObj->getPoolHADM()->getSocketBufferSize())); |
| isPool = true; |
| } else { |
| m_conn = createConnection(m_endpoint, connectTimeout, |
| sysProp.maxSocketBufferSize()); |
| } |
| |
| auto handShakeMsg = cacheImpl->createDataOutput(); |
| bool isNotificationChannel = false; |
| // Send byte Acceptor.CLIENT_TO_SERVER = (byte) 100; |
| // Send byte Acceptor.SERVER_TO_CLIENT = (byte) 101; |
| if (isClientNotification) { |
| isNotificationChannel = true; |
| if (isSecondary) { |
| handShakeMsg.write(static_cast<int8_t>(SECONDARY_SERVER_TO_CLIENT)); |
| } else { |
| handShakeMsg.write(static_cast<int8_t>(PRIMARY_SERVER_TO_CLIENT)); |
| } |
| } else { |
| handShakeMsg.write(static_cast<int8_t>(CLIENT_TO_SERVER)); |
| } |
| |
| // added for versioned client |
| int8_t versionOrdinal = Version::getOrdinal(); |
| handShakeMsg.write(versionOrdinal); |
| |
| LOGFINE("Client version ordinal is %d", versionOrdinal); |
| |
| handShakeMsg.write(static_cast<int8_t>(REPLY_OK)); |
| |
| // Send byte REPLY_OK = (byte)58; |
| if (!isClientNotification) { |
| m_port = m_conn->getPort(); |
| ports.insert(m_port); |
| } else { |
| auto&& lock = ports.make_lock(); |
| handShakeMsg.writeInt(static_cast<int32_t>(ports.size())); |
| for (const auto& port : ports) { |
| handShakeMsg.writeInt(static_cast<int32_t>(port)); |
| } |
| } |
| |
| // Writing handshake readtimeout value for CSVER_51+. |
| if (!isClientNotification) { |
| // SW: The timeout has been artificially raised to the highest |
| // permissible value for bug #232 for now. |
| // minus 10 sec because the GFE 5.7 gridDev branch adds a |
| // 5 sec buffer which was causing an int overflow. |
| handShakeMsg.writeInt(static_cast<int32_t>(0x7fffffff) - 10000); |
| } |
| |
| // Write header for byte FixedID since GFE 5.7 |
| handShakeMsg.write(static_cast<int8_t>(DSCode::FixedIDByte)); |
| // Writing byte for ClientProxyMembershipID class id=38 as registered on the |
| // java server. |
| handShakeMsg.write(static_cast<int8_t>(DSCode::ClientProxyMembershipId)); |
| if (endpointObj->getPoolHADM()) { |
| ClientProxyMembershipID* memId = |
| endpointObj->getPoolHADM()->getMembershipId(); |
| uint32_t memIdBufferLength; |
| auto memIdBuffer = memId->getDSMemberId(memIdBufferLength); |
| handShakeMsg.writeBytes( |
| reinterpret_cast<int8_t*>(const_cast<char*>(memIdBuffer)), |
| memIdBufferLength); |
| } else { |
| ACE_TCHAR hostName[256]; |
| ACE_OS::hostname(hostName, sizeof(hostName) - 1); |
| |
| ACE_INET_Addr driver(hostName); |
| |
| uint16_t hostPort = 0; |
| |
| // Add 3 durable Subcription properties to ClientProxyMembershipID |
| |
| auto&& durableId = sysProp.durableClientId(); |
| auto&& durableTimeOut = sysProp.durableTimeout(); |
| |
| // Write ClientProxyMembershipID serialized object. |
| uint32_t memIdBufferLength; |
| auto memId = cacheImpl->getClientProxyMembershipIDFactory().create( |
| hostName, driver, hostPort, durableId.c_str(), durableTimeOut); |
| auto memIdBuffer = memId->getDSMemberId(memIdBufferLength); |
| handShakeMsg.writeBytes( |
| reinterpret_cast<int8_t*>(const_cast<char*>(memIdBuffer)), |
| memIdBufferLength); |
| } |
| handShakeMsg.writeInt(static_cast<int32_t>(1)); |
| |
| bool requireServerAuth = false; |
| std::shared_ptr<Properties> credentials; |
| std::shared_ptr<CacheableBytes> serverChallenge; |
| |
| // Write overrides (just conflation for now) |
| handShakeMsg.write(getOverrides(&sysProp)); |
| |
| bool tmpIsSecurityOn = nullptr != cacheImpl->getAuthInitialize(); |
| |
| if (m_endpointObj) { |
| tmpIsSecurityOn = tmpIsSecurityOn || m_endpointObj->isMultiUserMode(); |
| } |
| |
| LOGDEBUG( |
| "TcrConnection tmpIsSecurityOn = %d isNotificationChannel = " |
| "%d ", |
| tmpIsSecurityOn, isNotificationChannel); |
| bool doIneedToSendCreds = true; |
| if (isNotificationChannel && m_endpointObj && |
| this->m_endpointObj->isMultiUserMode()) { |
| tmpIsSecurityOn = false; |
| doIneedToSendCreds = false; |
| } |
| |
| if (isNotificationChannel && !doIneedToSendCreds) { |
| handShakeMsg.write( |
| static_cast<uint8_t>(SECURITY_MULTIUSER_NOTIFICATIONCHANNEL)); |
| } else if (tmpIsSecurityOn) { |
| handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NORMAL)); |
| } else { |
| handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NONE)); |
| } |
| |
| if (tmpIsSecurityOn) { |
| try { |
| LOGFINER("TcrConnection: about to invoke authloader"); |
| const auto& tmpSecurityProperties = sysProp.getSecurityProperties(); |
| if (tmpSecurityProperties == nullptr) { |
| LOGWARN("TcrConnection: security properties not found."); |
| } |
| // only for backward connection |
| if (isClientNotification) { |
| if (const auto& authInitialize = cacheImpl->getAuthInitialize()) { |
| LOGFINER( |
| "TcrConnection: acquired handle to authLoader, " |
| "invoking getCredentials"); |
| |
| const auto& tmpAuthIniSecurityProperties = |
| authInitialize->getCredentials(tmpSecurityProperties, m_endpoint); |
| LOGFINER("TcrConnection: after getCredentials "); |
| credentials = tmpAuthIniSecurityProperties; |
| } |
| } |
| if (isClientNotification) { |
| credentials->toData(handShakeMsg); |
| } |
| } catch (const AuthenticationRequiredException&) { |
| LOGDEBUG("AuthenticationRequiredException got"); |
| throw; |
| } catch (const AuthenticationFailedException&) { |
| LOGDEBUG("AuthenticationFailedException got"); |
| throw; |
| } catch (const Exception& ex) { |
| LOGWARN("TcrConnection: failed to acquire handle to authLoader: [%s] %s", |
| ex.getName().c_str(), ex.what()); |
| auto message = |
| std::string("TcrConnection: failed to load authInit library: ") + |
| ex.what(); |
| throwException(AuthenticationFailedException(message)); |
| } |
| } |
| |
| size_t msgLength; |
| auto data = reinterpret_cast<char*>( |
| const_cast<uint8_t*>(handShakeMsg.getBuffer(&msgLength))); |
| LOGFINE("Attempting handshake with endpoint %s for %s%s connection", endpoint, |
| isClientNotification ? (isSecondary ? "secondary " : "primary ") : "", |
| isClientNotification ? "subscription" : "client"); |
| ConnErrType error = sendData(data, msgLength, connectTimeout, false); |
| |
| if (error == CONN_NOERR) { |
| auto acceptanceCode = readHandshakeData(1, connectTimeout); |
| |
| LOGDEBUG(" Handshake: Got Accept Code %d", acceptanceCode[0]); |
| /* adongre */ |
| if (acceptanceCode[0] == REPLY_SSL_ENABLED && !sysProp.sslEnabled()) { |
| LOGERROR("SSL is enabled on server, enable SSL in client as well"); |
| AuthenticationRequiredException ex( |
| "SSL is enabled on server, enable SSL in client as well"); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException(ex); |
| } |
| |
| auto serverQueueStatus = readHandshakeData(1, connectTimeout); |
| |
| // TESTING: Durable clients - set server queue status. |
| // 0 - Non-Redundant , 1- Redundant , 2- Primary |
| if (serverQueueStatus[0] == 1) { |
| m_hasServerQueue = REDUNDANT_SERVER; |
| } else if (serverQueueStatus[0] == 2) { |
| m_hasServerQueue = PRIMARY_SERVER; |
| } else { |
| m_hasServerQueue = NON_REDUNDANT_SERVER; |
| } |
| auto queueSizeMsg = readHandshakeData(4, connectTimeout); |
| auto dataInput = cacheImpl->createDataInput( |
| reinterpret_cast<const uint8_t*>(queueSizeMsg.data()), |
| queueSizeMsg.size()); |
| int32_t queueSize = 0; |
| queueSize = dataInput.readInt32(); |
| m_queueSize = queueSize > 0 ? queueSize : 0; |
| |
| m_endpointObj->setServerQueueStatus(m_hasServerQueue, m_queueSize); |
| |
| ////////////////////////// Set Pool Specific Q Size only when |
| /////////////////////////////////// |
| ////////////////////////// 1. SereverQStatus = Primary or |
| /////////////////////////////////// |
| ////////////////////////// 2. SereverQStatus = Non-Redundant and |
| /////////////////////////////////// |
| ////////////////////////// 3. Only when handshake is for subscription |
| /////////////////////////////////// |
| if (m_poolDM != nullptr) { |
| if ((m_hasServerQueue == PRIMARY_SERVER || |
| m_hasServerQueue == NON_REDUNDANT_SERVER) && |
| isClientNotification) { |
| m_poolDM->setPrimaryServerQueueSize(queueSize); |
| } |
| } |
| |
| if (!isClientNotification) { |
| // Read the DistributedMember object |
| auto recvMsgLen = readHandshakeArraySize(connectTimeout); |
| auto recvMessage = readHandshakeData(recvMsgLen, connectTimeout); |
| // If the distributed member has not been set yet, set it. |
| if (getEndpointObject()->getDistributedMemberID() == 0) { |
| LOGDEBUG("Deserializing distributed member Id"); |
| auto dataInputForClient = cacheImpl->createDataInput( |
| reinterpret_cast<const uint8_t*>(recvMessage.data()), |
| recvMessage.size()); |
| auto member = std::dynamic_pointer_cast<ClientProxyMembershipID>( |
| dataInputForClient.readObject()); |
| auto memId = cacheImpl->getMemberListForVersionStamp()->add(member); |
| getEndpointObject()->setDistributedMemberID(memId); |
| LOGDEBUG("Deserialized distributed member Id %d", memId); |
| } |
| } |
| |
| auto recvMsgLenBytes = readHandshakeData(2, connectTimeout); |
| auto dataInput3 = m_connectionManager->getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(recvMsgLenBytes.data()), |
| recvMsgLenBytes.size()); |
| uint16_t recvMsgLen2 = dataInput3.readInt16(); |
| auto recvMessage = readHandshakeData(recvMsgLen2, connectTimeout); |
| |
| if (!isClientNotification) { |
| auto deltaEnabledMsg = readHandshakeData(1, connectTimeout); |
| auto di = m_connectionManager->getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(deltaEnabledMsg.data()), 1); |
| ThinClientBaseDM::setDeltaEnabledOnServer(di.readBoolean()); |
| } |
| |
| switch (acceptanceCode[0]) { |
| case REPLY_OK: |
| case SUCCESSFUL_SERVER_TO_CLIENT: |
| LOGFINER("Handshake reply: %u,%u,%u", acceptanceCode[0], |
| serverQueueStatus[0], recvMsgLen2); |
| if (isClientNotification) readHandshakeInstantiatorMsg(connectTimeout); |
| break; |
| case REPLY_AUTHENTICATION_FAILED: { |
| AuthenticationFailedException ex( |
| reinterpret_cast<char*>(recvMessage.data())); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException(ex); |
| } |
| case REPLY_AUTHENTICATION_REQUIRED: { |
| AuthenticationRequiredException ex( |
| reinterpret_cast<char*>(recvMessage.data())); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException(ex); |
| } |
| case REPLY_DUPLICATE_DURABLE_CLIENT: { |
| DuplicateDurableClientException ex( |
| reinterpret_cast<char*>(recvMessage.data())); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException(ex); |
| } |
| case REPLY_REFUSED: |
| case REPLY_INVALID: |
| case UNSUCCESSFUL_SERVER_TO_CLIENT: { |
| LOGERROR("Handshake rejected by server[%s]: %s", |
| m_endpointObj->name().c_str(), |
| reinterpret_cast<char*>(recvMessage.data())); |
| auto message = std::string("TcrConnection::TcrConnection: ") + |
| "Handshake rejected by server: " + |
| reinterpret_cast<char*>(recvMessage.data()); |
| CacheServerException ex(message); |
| GF_SAFE_DELETE_CON(m_conn); |
| throw ex; |
| } |
| default: { |
| LOGERROR( |
| "Unknown error[%d] received from server [%s] in handshake: " |
| "%s", |
| acceptanceCode[0], m_endpointObj->name().c_str(), |
| recvMessage.data()); |
| auto message = |
| std::string("TcrConnection::TcrConnection: Unknown error") + |
| " received from server in handshake: " + |
| reinterpret_cast<char*>(recvMessage.data()); |
| MessageException ex(message); |
| GF_SAFE_DELETE_CON(m_conn); |
| throw ex; |
| } |
| } |
| |
| } else { |
| int32_t lastError = ACE_OS::last_error(); |
| LOGFINE("Handshake failed, errno: %d: %s", lastError, |
| ACE_OS::strerror(lastError)); |
| GF_SAFE_DELETE_CON(m_conn); |
| if (error & CONN_TIMEOUT) { |
| throw TimeoutException( |
| "TcrConnection::TcrConnection: " |
| "connection timed out during handshake"); |
| } else { |
| throw GeodeIOException( |
| "TcrConnection::TcrConnection: " |
| "Handshake failure"); |
| } |
| } |
| |
| // TODO: we can authenticate endpoint here if pool is not in multiuser mode. |
| // for backward connection we send credentials to server in handshake itself. |
| // for forward connection we need to send credentail to server |
| //---if pool in not in multiuser node |
| //---or old endpoint case. |
| |
| if (this->m_endpointObj && !isNotificationChannel && tmpIsSecurityOn && |
| (!isPool || !this->m_endpointObj->isMultiUserMode())) { |
| // this->m_endpointObj->authenticateEndpoint(this); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| Connector* TcrConnection::createConnection( |
| const char* endpoint, std::chrono::microseconds connectTimeout, |
| int32_t maxBuffSizePool) { |
| Connector* socket = nullptr; |
| auto& systemProperties = m_connectionManager->getCacheImpl() |
| ->getDistributedSystem() |
| .getSystemProperties(); |
| if (systemProperties.sslEnabled()) { |
| socket = new TcpSslConn(endpoint, connectTimeout, maxBuffSizePool, |
| systemProperties.sslTrustStore().c_str(), |
| systemProperties.sslKeyStore().c_str(), |
| systemProperties.sslKeystorePassword().c_str()); |
| } else { |
| socket = new TcpConn(endpoint, connectTimeout, maxBuffSizePool); |
| } |
| // as socket.init() calls throws exception... |
| m_conn = socket; |
| socket->init(); |
| return socket; |
| } |
| |
| /* The timeout behaviour for different methods is as follows: |
| * receive(): |
| * Header: default timeout |
| * Body: default timeout |
| * sendRequest()/sendRequestForChunkedResponse(): |
| * default timeout during send; for receive: |
| * Header: default timeout * default timeout retries to handle large payload |
| * if a timeout other than default timeout is specified then |
| * that is used instead |
| * Body: default timeout |
| */ |
| inline ConnErrType TcrConnection::receiveData( |
| char* buffer, size_t length, std::chrono::microseconds receiveTimeoutSec, |
| bool checkConnected, bool isNotificationMessage) { |
| std::chrono::microseconds defaultWaitSecs = |
| isNotificationMessage ? std::chrono::seconds(1) : std::chrono::seconds(2); |
| if (defaultWaitSecs > receiveTimeoutSec) defaultWaitSecs = receiveTimeoutSec; |
| |
| auto startLen = length; |
| |
| while (length > 0 && receiveTimeoutSec > std::chrono::microseconds::zero()) { |
| if (checkConnected && !m_connected) { |
| return CONN_IOERR; |
| } |
| if (receiveTimeoutSec < defaultWaitSecs) { |
| defaultWaitSecs = receiveTimeoutSec; |
| } |
| auto readBytes = m_conn->receive(buffer, length, defaultWaitSecs); |
| int32_t lastError = ACE_OS::last_error(); |
| length -= readBytes; |
| if (length > 0 && lastError != ETIME && lastError != ETIMEDOUT) { |
| return CONN_IOERR; |
| } |
| buffer += readBytes; |
| /* |
| Update pools byteRecieved stat here. |
| readMessageChunked, readMessage, readHandshakeData, |
| readHandshakeRawData, readHandShakeBytes, readHandShakeInt, |
| readHandshakeString, all call TcrConnection::receiveData. |
| */ |
| LOGDEBUG("TcrConnection::receiveData length = %zu defaultWaitSecs = %s", |
| length, to_string(defaultWaitSecs).c_str()); |
| if (m_poolDM != nullptr) { |
| LOGDEBUG("TcrConnection::receiveData readBytes = %zu", readBytes); |
| m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes)); |
| } |
| receiveTimeoutSec -= defaultWaitSecs; |
| if ((length == startLen) && isNotificationMessage) { // no data read |
| break; |
| } |
| } |
| // Postconditions for checking bounds. |
| return (length == 0 ? CONN_NOERR |
| : (length == startLen ? CONN_NODATA : CONN_TIMEOUT)); |
| } |
| |
| inline ConnErrType TcrConnection::sendData( |
| const char* buffer, size_t length, std::chrono::microseconds sendTimeout, |
| bool checkConnected) { |
| std::chrono::microseconds dummy{0}; |
| return sendData(dummy, buffer, length, sendTimeout, checkConnected); |
| } |
| |
| inline ConnErrType TcrConnection::sendData( |
| std::chrono::microseconds& timeSpent, const char* buffer, size_t length, |
| std::chrono::microseconds sendTimeout, bool checkConnected) { |
| std::chrono::microseconds defaultWaitSecs = std::chrono::seconds(2); |
| if (defaultWaitSecs > sendTimeout) defaultWaitSecs = sendTimeout; |
| LOGDEBUG( |
| "before send len %zu sendTimeoutSec = %s checkConnected = %d m_connected " |
| "%d", |
| length, to_string(sendTimeout).c_str(), checkConnected, m_connected); |
| while (length > 0 && sendTimeout > std::chrono::microseconds::zero()) { |
| if (checkConnected && !m_connected) { |
| return CONN_IOERR; |
| } |
| if (sendTimeout < defaultWaitSecs) { |
| defaultWaitSecs = sendTimeout; |
| } |
| auto sentBytes = m_conn->send(buffer, length, defaultWaitSecs); |
| |
| length -= sentBytes; |
| buffer += sentBytes; |
| // we don't want to decrement the remaining time for the last iteration |
| if (length == 0) { |
| break; |
| } |
| int32_t lastError = ACE_OS::last_error(); |
| if (length > 0 && lastError != ETIME && lastError != ETIMEDOUT) { |
| return CONN_IOERR; |
| } |
| |
| timeSpent += defaultWaitSecs; |
| sendTimeout -= defaultWaitSecs; |
| } |
| |
| return (length == 0 ? CONN_NOERR : CONN_TIMEOUT); |
| } |
| |
| char* TcrConnection::sendRequest(const char* buffer, size_t len, |
| size_t* recvLen, |
| std::chrono::microseconds sendTimeoutSec, |
| std::chrono::microseconds receiveTimeoutSec, |
| int32_t request) { |
| LOGDEBUG("TcrConnection::sendRequest"); |
| std::chrono::microseconds timeSpent{0}; |
| |
| send(timeSpent, buffer, len, sendTimeoutSec); |
| |
| if (timeSpent >= receiveTimeoutSec) { |
| throwException( |
| TimeoutException("TcrConnection::send: connection timed out")); |
| } |
| |
| receiveTimeoutSec -= timeSpent; |
| ConnErrType opErr = CONN_NOERR; |
| return readMessage(recvLen, receiveTimeoutSec, true, &opErr, false, request); |
| } |
| |
| void TcrConnection::sendRequestForChunkedResponse( |
| const TcrMessage& request, size_t len, TcrMessageReply& reply, |
| std::chrono::microseconds sendTimeoutSec, |
| std::chrono::microseconds receiveTimeoutSec) { |
| if (useReplyTimeout(request)) { |
| receiveTimeoutSec = reply.getTimeout(); |
| sendTimeoutSec = reply.getTimeout(); |
| } |
| |
| receiveTimeoutSec -= sendWithTimeouts(request.getMsgData(), len, |
| sendTimeoutSec, receiveTimeoutSec); |
| |
| // to help in decoding the reply based on what was the request type |
| reply.setMessageTypeRequest(request.getMessageType()); |
| |
| if (replyHasResult(request, reply)) { |
| readMessageChunked(reply, receiveTimeoutSec, true); |
| } |
| } |
| |
| bool TcrConnection::useReplyTimeout(const TcrMessage& request) const { |
| auto messageType = request.getMessageType(); |
| return ((messageType == TcrMessage::QUERY) || |
| (messageType == TcrMessage::QUERY_WITH_PARAMETERS) || |
| (messageType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) || |
| (messageType == TcrMessage::GETDURABLECQS_MSG_TYPE) || |
| (messageType == TcrMessage::EXECUTE_FUNCTION) || |
| (messageType == TcrMessage::EXECUTE_REGION_FUNCTION) || |
| (messageType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP)); |
| } |
| |
| std::chrono::microseconds TcrConnection::sendWithTimeouts( |
| const char* data, size_t len, std::chrono::microseconds sendTimeout, |
| std::chrono::microseconds receiveTimeout) { |
| std::chrono::microseconds timeSpent{0}; |
| send(timeSpent, data, len, sendTimeout, true); |
| |
| if (timeSpent >= receiveTimeout) { |
| throwException( |
| TimeoutException("TcrConnection::send: connection timed out")); |
| } |
| |
| return timeSpent; |
| } |
| |
| bool TcrConnection::replyHasResult(const TcrMessage& request, |
| TcrMessageReply& reply) { |
| auto hasResult = true; |
| |
| // no need of it now, this will not come here |
| if (request.getMessageType() == |
| TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) { |
| ChunkedFunctionExecutionResponse* resultCollector = |
| static_cast<ChunkedFunctionExecutionResponse*>( |
| reply.getChunkedResultHandler()); |
| if (resultCollector->getResult() == false) { |
| LOGDEBUG( |
| "TcrConnection::sendRequestForChunkedResponse: function execution, " |
| "no response desired"); |
| hasResult = false; |
| } |
| } |
| |
| return hasResult; |
| } |
| |
| void TcrConnection::send(const char* buffer, size_t len, |
| std::chrono::microseconds sendTimeoutSec, |
| bool checkConnected) { |
| std::chrono::microseconds dummy; |
| send(dummy, buffer, len, sendTimeoutSec, checkConnected); |
| } |
| |
| void TcrConnection::send(std::chrono::microseconds& timeSpent, |
| const char* buffer, size_t len, |
| std::chrono::microseconds sendTimeoutSec, bool) { |
| // LOGINFO("TcrConnection::send: [%p] sending request to endpoint %s;", |
| //: this, m_endpoint); |
| |
| LOGDEBUG( |
| "TcrConnection::send: [%p] sending request to endpoint %s; bytes: %s", |
| this, m_endpoint, Utils::convertBytesToString(buffer, len).c_str()); |
| |
| ConnErrType error = sendData(timeSpent, buffer, len, sendTimeoutSec); |
| |
| LOGFINER( |
| "TcrConnection::send: completed send request to endpoint %s " |
| "with error: %d", |
| m_endpoint, error); |
| |
| if (error != CONN_NOERR) { |
| if (error == CONN_TIMEOUT) { |
| throwException( |
| TimeoutException("TcrConnection::send: connection timed out")); |
| } else { |
| throwException( |
| GeodeIOException("TcrConnection::send: connection failure")); |
| } |
| } |
| } |
| |
| char* TcrConnection::receive(size_t* recvLen, ConnErrType* opErr, |
| std::chrono::microseconds receiveTimeoutSec) { |
| return readMessage(recvLen, receiveTimeoutSec, false, opErr, true); |
| } |
| |
| char* TcrConnection::readMessage(size_t* recvLen, |
| std::chrono::microseconds receiveTimeoutSec, |
| bool doHeaderTimeoutRetries, |
| ConnErrType* opErr, bool isNotificationMessage, |
| int32_t request) { |
| char msg_header[HEADER_LENGTH]; |
| int32_t msgLen; |
| ConnErrType error; |
| |
| std::chrono::microseconds headerTimeout = receiveTimeoutSec; |
| if (doHeaderTimeoutRetries && |
| receiveTimeoutSec == DEFAULT_READ_TIMEOUT_SECS) { |
| headerTimeout = DEFAULT_READ_TIMEOUT_SECS * DEFAULT_TIMEOUT_RETRIES; |
| } |
| |
| LOGDEBUG("TcrConnection::readMessage: receiving reply from endpoint %s", |
| m_endpoint); |
| |
| error = receiveData(msg_header, HEADER_LENGTH, headerTimeout, true, |
| isNotificationMessage); |
| LOGDEBUG("TcrConnection::readMessage after recieve data"); |
| if (error != CONN_NOERR) { |
| // the !isNotificationMessage ensures that notification channel |
| // gets the TimeoutException when no data was received and is ignored by |
| // notification channel; when data has been received then it throws |
| // GeodeIOException that causes the channel to close as required |
| if (error == CONN_NODATA || |
| (error == CONN_TIMEOUT && !isNotificationMessage)) { |
| if (isNotificationMessage) { |
| // fix #752 - do not throw periodic TimeoutException for subscription |
| // channels to avoid frequent stack trace processing. |
| return nullptr; |
| } else { |
| throwException(TimeoutException( |
| "TcrConnection::readMessage: " |
| "connection timed out while receiving message header")); |
| } |
| } else { |
| if (isNotificationMessage) { |
| *opErr = CONN_IOERR; |
| return nullptr; |
| } |
| throwException(GeodeIOException( |
| "TcrConnection::readMessage: " |
| "connection failure while receiving message header")); |
| } |
| } |
| |
| LOGDEBUG( |
| "TcrConnection::readMessage: received header from endpoint %s; " |
| "bytes: %s", |
| m_endpoint, |
| Utils::convertBytesToString(msg_header, HEADER_LENGTH).c_str()); |
| |
| auto input = m_connectionManager->getCacheImpl()->createDataInput( |
| reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH); |
| // ignore msgType |
| input.readInt32(); |
| msgLen = input.readInt32(); |
| // check that message length is valid. |
| if (!(msgLen > 0) && request == TcrMessage::GET_CLIENT_PR_METADATA) { |
| char* fullMessage; |
| *recvLen = HEADER_LENGTH + msgLen; |
| _GEODE_NEW(fullMessage, char[HEADER_LENGTH + msgLen]); |
| std::memcpy(fullMessage, msg_header, HEADER_LENGTH); |
| return fullMessage; |
| // exit(0); |
| } |
| |
| // user has to delete this pointer |
| char* fullMessage; |
| *recvLen = HEADER_LENGTH + msgLen; |
| _GEODE_NEW(fullMessage, char[HEADER_LENGTH + msgLen]); |
| std::memcpy(fullMessage, msg_header, HEADER_LENGTH); |
| |
| std::chrono::microseconds mesgBodyTimeout = receiveTimeoutSec; |
| if (isNotificationMessage) { |
| mesgBodyTimeout = receiveTimeoutSec * DEFAULT_TIMEOUT_RETRIES; |
| } |
| error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout, |
| true, isNotificationMessage); |
| if (error != CONN_NOERR) { |
| delete[] fullMessage; |
| // the !isNotificationMessage ensures that notification channel |
| // gets the GeodeIOException and not TimeoutException; |
| // this is required since header has already been read meaning there could |
| // be stale data on socket and so it should close the notification channel |
| // while TimeoutException is normally ignored by notification channel |
| if ((error & CONN_TIMEOUT) && !isNotificationMessage) { |
| throwException(TimeoutException( |
| "TcrConnection::readMessage: " |
| "connection timed out while receiving message body")); |
| } else { |
| if (isNotificationMessage) { |
| *opErr = CONN_IOERR; |
| return nullptr; |
| } |
| throwException( |
| GeodeIOException("TcrConnection::readMessage: " |
| "connection failure while receiving message body")); |
| } |
| } |
| |
| LOGDEBUG( |
| "TcrConnection::readMessage: received message body from " |
| "endpoint %s; bytes: %s", |
| m_endpoint, |
| Utils::convertBytesToString(fullMessage + HEADER_LENGTH, msgLen).c_str()); |
| |
| return fullMessage; |
| } |
| |
| void TcrConnection::readMessageChunked(TcrMessageReply& reply, |
| std::chrono::microseconds receiveTimeout, |
| bool doHeaderTimeoutRetries) { |
| auto headerTimeout = |
| calculateHeaderTimeout(receiveTimeout, doHeaderTimeoutRetries); |
| |
| LOGFINER( |
| "TcrConnection::readMessageChunked: receiving reply from " |
| "endpoint %s", |
| m_endpoint); |
| |
| auto responseHeader = readResponseHeader(headerTimeout); |
| |
| reply.setMessageType(responseHeader.messageType); |
| reply.setTransId(responseHeader.transactionId); |
| |
| // Initialize the chunk processing |
| reply.startProcessChunk(m_chunksProcessSema); |
| |
| // indicate an end to chunk processing and wait for processing |
| // to end even if reading the chunks fails in middle |
| FinalizeProcessChunk endProcessChunk(reply, |
| m_endpointObj->getDistributedMemberID()); |
| |
| auto header = responseHeader.header; |
| try { |
| while ( |
| processChunk(reply, receiveTimeout, header.chunkLength, header.flags)) { |
| header = readChunkHeader(headerTimeout); |
| } |
| } catch (const Exception&) { |
| if (auto handler = reply.getChunkedResultHandler()) { |
| if (auto ex = handler->getException()) { |
| LOGDEBUG("Found existing exception ", ex->what()); |
| handler->clearException(); |
| } |
| } |
| throw; |
| } |
| |
| LOGFINER( |
| "TcrConnection::readMessageChunked: read full reply " |
| "from endpoint %s", |
| m_endpoint); |
| } |
| |
| std::chrono::microseconds TcrConnection::calculateHeaderTimeout( |
| std::chrono::microseconds receiveTimeout, bool retry) { |
| auto headerTimeout = receiveTimeout; |
| if (retry && receiveTimeout == DEFAULT_READ_TIMEOUT_SECS) { |
| headerTimeout *= DEFAULT_TIMEOUT_RETRIES; |
| } |
| return headerTimeout; |
| } |
| |
| chunkedResponseHeader TcrConnection::readResponseHeader( |
| std::chrono::microseconds timeout) { |
| uint8_t receiveBuffer[HEADER_LENGTH]; |
| chunkedResponseHeader header; |
| |
| auto error = receiveData(reinterpret_cast<char*>(receiveBuffer), |
| HEADER_LENGTH, timeout, true, false); |
| if (error != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| throwException(TimeoutException( |
| "TcrConnection::readResponseHeader: " |
| "connection timed out while receiving message header")); |
| } else { |
| throwException(GeodeIOException( |
| "TcrConnection::readResponseHeader: " |
| "connection failure while receiving message header")); |
| } |
| } |
| |
| LOGDEBUG( |
| "TcrConnection::readResponseHeader: received header from " |
| "endpoint %s; bytes: %s", |
| m_endpoint, |
| Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str()); |
| |
| auto input = m_connectionManager->getCacheImpl()->createDataInput( |
| receiveBuffer, HEADER_LENGTH); |
| header.messageType = input.readInt32(); |
| header.numberOfParts = input.readInt32(); |
| header.transactionId = input.readInt32(); |
| header.header.chunkLength = input.readInt32(); |
| header.header.flags = input.read(); |
| LOGDEBUG( |
| "TcrConnection::readResponseHeader: " |
| "messageType=%" PRId32 ", numberOfParts=%" PRId32 |
| ", transactionId=%" PRId32 ", chunkLength=%" PRId32 |
| ", lastChunkAndSecurityFlags=0x%" PRIx8, |
| header.messageType, header.numberOfParts, header.transactionId, |
| header.header.chunkLength, header.header.flags); |
| |
| return header; |
| } // namespace client |
| |
| chunkHeader TcrConnection::readChunkHeader(std::chrono::microseconds timeout) { |
| uint8_t receiveBuffer[CHUNK_HEADER_LENGTH]; |
| chunkHeader header; |
| |
| auto error = receiveData(reinterpret_cast<char*>(receiveBuffer), |
| CHUNK_HEADER_LENGTH, timeout, true, false); |
| if (error != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| throwException(TimeoutException( |
| "TcrConnection::readChunkHeader: " |
| "connection timed out while receiving message header")); |
| } else { |
| throwException(GeodeIOException( |
| "TcrConnection::readChunkHeader: " |
| "connection failure while receiving message header")); |
| } |
| } |
| |
| LOGDEBUG( |
| "TcrConnection::readChunkHeader: received header from " |
| "endpoint %s; bytes: %s", |
| m_endpoint, |
| Utils::convertBytesToString(receiveBuffer, CHUNK_HEADER_LENGTH).c_str()); |
| |
| auto input = m_connectionManager->getCacheImpl()->createDataInput( |
| receiveBuffer, CHUNK_HEADER_LENGTH); |
| header.chunkLength = input.readInt32(); |
| header.flags = input.read(); |
| LOGDEBUG( |
| "TcrConnection::readChunkHeader: " |
| ", chunkLen=%" PRId32 ", lastChunkAndSecurityFlags=0x%" PRIx8, |
| header.chunkLength, header.flags); |
| |
| return header; |
| } |
| |
| std::vector<uint8_t> TcrConnection::readChunkBody( |
| std::chrono::microseconds timeout, int32_t chunkLength) { |
| std::vector<uint8_t> chunkBody(chunkLength); |
| auto error = receiveData(reinterpret_cast<char*>(chunkBody.data()), |
| chunkLength, timeout, true, false); |
| if (error != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| throwException( |
| TimeoutException("TcrConnection::readChunkBody: " |
| "connection timed out while receiving chunk body")); |
| } else { |
| throwException( |
| GeodeIOException("TcrConnection::readChunkBody: " |
| "connection failure while receiving chunk body")); |
| } |
| } |
| |
| LOGDEBUG( |
| "TcrConnection::readChunkBody: received chunk body from endpoint " |
| "%s; bytes: %s", |
| m_endpoint, |
| Utils::convertBytesToString(chunkBody.data(), chunkLength).c_str()); |
| return chunkBody; |
| } |
| |
| bool TcrConnection::processChunk(TcrMessageReply& reply, |
| std::chrono::microseconds timeout, |
| int32_t chunkLength, |
| int8_t lastChunkAndSecurityFlags) { |
| // NOTE: this buffer is allocated by readChunkBody, and reply.processChunk |
| // takes ownership, so we don't delete it here on failure |
| std::vector<uint8_t> chunkBody = readChunkBody(timeout, chunkLength); |
| |
| // Process the chunk; the actual processing is done by a separate thread |
| // ThinClientBaseDM::m_chunkProcessor. |
| reply.processChunk(chunkBody, chunkLength, |
| m_endpointObj->getDistributedMemberID(), |
| lastChunkAndSecurityFlags); |
| // Return boolean indicating whether or not there are more chunks, i.e. |
| // the *inverse* of the flag indicating this is the last chunk. It's a |
| // little confusing here, but makes calling code clearer. |
| return (lastChunkAndSecurityFlags & LAST_CHUNK_MASK) ? false : true; |
| } |
| |
| void TcrConnection::close() { |
| TcrMessage* closeMsg = TcrMessage::getCloseConnMessage( |
| m_poolDM->getConnectionManager().getCacheImpl()); |
| try { |
| if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH && |
| !m_connectionManager->isNetDown()) { |
| send(closeMsg->getMsgData(), closeMsg->getMsgLength(), |
| std::chrono::seconds(2), false); |
| } |
| } catch (Exception& e) { |
| LOGINFO("Close connection message failed with msg: %s", e.what()); |
| } catch (...) { |
| LOGINFO("Close connection message failed"); |
| } |
| } |
| |
| std::vector<int8_t> TcrConnection::readHandshakeData( |
| int32_t msgLength, std::chrono::microseconds connectTimeout) { |
| ConnErrType error = CONN_NOERR; |
| if (msgLength < 0) { |
| msgLength = 0; |
| } |
| std::vector<int8_t> message(msgLength + 1); |
| message.data()[msgLength] = '\0'; |
| if (msgLength == 0) { |
| return message; |
| } |
| if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength, |
| connectTimeout, false)) != CONN_NOERR) { |
| GF_SAFE_DELETE_CON(m_conn); |
| if (error & CONN_TIMEOUT) { |
| throwException( |
| TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| throwException( |
| GeodeIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| } else { |
| return message; |
| } |
| } |
| |
| std::shared_ptr<CacheableBytes> TcrConnection::readHandshakeRawData( |
| int32_t msgLength, std::chrono::microseconds connectTimeout) { |
| ConnErrType error = CONN_NOERR; |
| if (msgLength < 0) { |
| msgLength = 0; |
| } |
| if (msgLength == 0) { |
| return nullptr; |
| } |
| std::vector<int8_t> message(msgLength); |
| if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength, |
| connectTimeout, false)) != CONN_NOERR) { |
| GF_SAFE_DELETE_CON(m_conn); |
| if (error & CONN_TIMEOUT) { |
| throwException( |
| TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| throwException( |
| GeodeIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| } else { |
| return CacheableBytes::create(std::move(message)); |
| } |
| } |
| |
| std::shared_ptr<CacheableBytes> TcrConnection::readHandshakeByteArray( |
| std::chrono::microseconds connectTimeout) { |
| uint32_t arraySize = readHandshakeArraySize(connectTimeout); |
| return readHandshakeRawData(arraySize, connectTimeout); |
| } |
| |
| // read a byte array |
| int32_t TcrConnection::readHandshakeArraySize( |
| std::chrono::microseconds connectTimeout) { |
| auto arrayLenHeader = readHandshakeData(1, connectTimeout); |
| |
| int32_t arrayLength = static_cast<uint8_t>(arrayLenHeader[0]); |
| if (static_cast<int8_t>(arrayLenHeader[0]) == -2) { |
| auto arrayLengthBytes = readHandshakeData(2, connectTimeout); |
| auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()), |
| arrayLengthBytes.size()); |
| arrayLength = dataInput2.readInt16(); |
| } else if (static_cast<int8_t>(arrayLenHeader[0]) == -3) { |
| auto arrayLengthBytes = readHandshakeData(4, connectTimeout); |
| auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()), |
| arrayLengthBytes.size()); |
| arrayLength = dataInput2.readInt32(); |
| } |
| |
| return arrayLength; |
| } |
| |
| void TcrConnection::readHandshakeInstantiatorMsg( |
| std::chrono::microseconds connectTimeout) { |
| int hashMapSize = readHandshakeArraySize(connectTimeout); |
| for (int i = 0; i < hashMapSize; i++) { |
| readHandShakeBytes(6, connectTimeout); // reading integer and arraylist |
| // type |
| int aLen = readHandshakeArraySize(connectTimeout); |
| for (int j = 0; j < aLen; j++) { |
| readHandshakeString(connectTimeout); |
| } |
| } |
| |
| hashMapSize = readHandshakeArraySize(connectTimeout); |
| for (int i = 0; i < hashMapSize; i++) { |
| readHandShakeBytes(5, connectTimeout); // reading integer |
| readHandshakeString(connectTimeout); |
| } |
| |
| // added in 3.6 and 6.6 |
| int hashMapSize2 = readHandshakeArraySize(connectTimeout); |
| for (int i = 0; i < hashMapSize2; i++) { |
| readHandShakeBytes(6, connectTimeout); // reading integer and arraylist |
| // type |
| int aLen = readHandshakeArraySize(connectTimeout); |
| for (int j = 0; j < aLen; j++) { |
| readHandshakeString(connectTimeout); |
| } |
| } |
| } |
| void TcrConnection::readHandShakeBytes( |
| int numberOfBytes, std::chrono::microseconds connectTimeout) { |
| ConnErrType error = CONN_NOERR; |
| uint8_t* recvMessage; |
| _GEODE_NEW(recvMessage, uint8_t[numberOfBytes]); |
| |
| if ((error = receiveData(reinterpret_cast<char*>(recvMessage), numberOfBytes, |
| connectTimeout, false)) != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| _GEODE_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException( |
| TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| _GEODE_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException( |
| GeodeIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| } |
| |
| _GEODE_SAFE_DELETE_ARRAY(recvMessage); |
| } |
| |
| int32_t TcrConnection::readHandShakeInt( |
| std::chrono::microseconds connectTimeout) { |
| ConnErrType error = CONN_NOERR; |
| uint8_t* recvMessage; |
| _GEODE_NEW(recvMessage, uint8_t[4]); |
| |
| if ((error = receiveData(reinterpret_cast<char*>(recvMessage), 4, |
| connectTimeout, false)) != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| _GEODE_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException( |
| TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| _GEODE_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException( |
| GeodeIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| } |
| |
| auto di = |
| m_connectionManager->getCacheImpl()->createDataInput(recvMessage, 4); |
| int32_t val = di.readInt32(); |
| |
| _GEODE_SAFE_DELETE_ARRAY(recvMessage); |
| |
| return val; |
| } |
| |
| std::shared_ptr<CacheableString> TcrConnection::readHandshakeString( |
| std::chrono::microseconds connectTimeout) { |
| ConnErrType error = CONN_NOERR; |
| |
| char cstypeid; |
| if (receiveData(&cstypeid, 1, connectTimeout, false) != CONN_NOERR) { |
| GF_SAFE_DELETE_CON(m_conn); |
| if (error & CONN_TIMEOUT) { |
| LOGFINE("Timeout receiving string typeid"); |
| throwException( |
| TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake reading string type ID")); |
| } else { |
| LOGFINE("IO error receiving string typeid"); |
| throwException( |
| GeodeIOException("TcrConnection::TcrConnection: " |
| "Handshake failure reading string type ID")); |
| } |
| } |
| |
| LOGDEBUG("Received string typeid as %d", cstypeid); |
| |
| uint32_t length = 0; |
| switch (static_cast<DSCode>(cstypeid)) { |
| case DSCode::CacheableNullString: { |
| return nullptr; |
| } |
| case DSCode::CacheableASCIIString: { |
| auto lenBytes = readHandshakeData(2, connectTimeout); |
| auto lenDI = m_connectionManager->getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(lenBytes.data()), lenBytes.size()); |
| length = lenDI.readInt16(); |
| |
| break; |
| } |
| default: { |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException( |
| GeodeIOException("TcrConnection::TcrConnection: " |
| "Handshake failure: Unexpected string type ID")); |
| } |
| } |
| |
| LOGDEBUG(" Received string len %d", length); |
| |
| if (length == 0) { |
| return nullptr; |
| } |
| |
| std::vector<char> recvMessage(length + 1); |
| recvMessage[length] = '\0'; |
| |
| if ((error = receiveData(recvMessage.data(), length, connectTimeout, |
| false)) != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| GF_SAFE_DELETE_CON(m_conn); |
| LOGFINE("Timeout receiving string data"); |
| throwException( |
| TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake reading string bytes")); |
| } else { |
| GF_SAFE_DELETE_CON(m_conn); |
| LOGFINE("IO error receiving string data"); |
| throwException( |
| GeodeIOException("TcrConnection::TcrConnection: " |
| "Handshake failure reading string bytes")); |
| } |
| } else { |
| LOGDEBUG(" Received string data [%s]", recvMessage.data()); |
| auto retval = |
| CacheableString::create(std::string(recvMessage.data(), length)); |
| return retval; |
| } |
| } |
| bool TcrConnection::hasExpired(const std::chrono::milliseconds& expiryTime) { |
| if (expiryTime <= std::chrono::milliseconds::zero()) { |
| return false; |
| } |
| |
| return (clock::now() - m_creationTime) > expiryTime; |
| } |
| |
| bool TcrConnection::isIdle(const std::chrono::milliseconds& idleTime) { |
| if (idleTime <= std::chrono::milliseconds::zero()) { |
| return false; |
| } |
| |
| return (clock::now() - m_lastAccessed) > idleTime; |
| } |
| |
| void TcrConnection::touch() { m_lastAccessed = clock::now(); } |
| |
| TcrConnection::time_point TcrConnection::getLastAccessed() { |
| return m_lastAccessed; |
| } |
| |
| uint8_t TcrConnection::getOverrides(const SystemProperties* props) { |
| uint8_t conflateByte = 0; |
| |
| auto&& conflate = props->conflateEvents(); |
| if (conflate == "true") { |
| conflateByte = 1; |
| } else if (conflate == "false") { |
| conflateByte = 2; |
| } |
| |
| return conflateByte; |
| } |
| |
| void TcrConnection::updateCreationTime() { |
| m_creationTime = clock::now(); |
| touch(); |
| } |
| |
| TcrConnection::~TcrConnection() { |
| LOGDEBUG("Tcrconnection destructor %p . conn ref to endopint %d", this, |
| m_endpointObj->getConnRefCounter()); |
| m_endpointObj->addConnRefCounter(-1); |
| if (m_conn != nullptr) { |
| LOGDEBUG("closing the connection"); |
| m_conn->close(); |
| GF_SAFE_DELETE_CON(m_conn); |
| } |
| } |
| |
| bool TcrConnection::setAndGetBeingUsed(volatile bool isBeingUsed, |
| bool forTransaction) { |
| uint32_t currentValue = 0; |
| |
| if (!forTransaction) { |
| if (isBeingUsed) { |
| if (m_isUsed == 1 || m_isUsed == 2) return false; |
| if (m_isUsed.compare_exchange_strong(currentValue, 1)) return true; |
| return false; |
| } else { |
| m_isUsed = 0; |
| return true; |
| } |
| } else { |
| if (isBeingUsed) { |
| if (m_isUsed == 1) { // already used |
| return false; |
| } |
| if (m_isUsed == 2) { // transaction thread has set, reused it |
| return true; |
| } |
| if (m_isUsed.compare_exchange_strong(currentValue, |
| 2 /*for transaction*/)) { |
| return true; |
| } |
| return false; |
| } else { |
| // m_isUsed = 0;//this will done by releasing the connection by |
| // transaction at the end of transaction |
| return true; |
| } |
| } |
| } |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |