| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| #include "TcrConnection.hpp" |
| #include "../DistributedSystem.hpp" |
| #include "../SystemProperties.hpp" |
| #include "Connector.hpp" |
| #include "TcpSslConn.hpp" |
| #include "ClientProxyMembershipID.hpp" |
| #include "ThinClientPoolHADM.hpp" |
| #include <memory.h> |
| #include <ace/INET_Addr.h> |
| #include <ace/OS.h> |
| #include "TcrEndpoint.hpp" |
| |
| #include "GemfireTypeIdsImpl.hpp" |
| #include "TcrConnectionManager.hpp" |
| #include "DistributedSystemImpl.hpp" |
| #include "Version.hpp" |
| |
| #include "DiffieHellman.hpp" |
| #include "Utils.hpp" // for RandGen for server challenge |
| #include "ThinClientRegion.hpp" |
| |
| using namespace gemfire; |
| const int HEADER_LENGTH = 17; |
| const int MAXBUFSIZE = 65536; |
| const int BODYLENPOS = 4; |
| const int64_t INITIAL_CONNECTION_ID = 26739; |
| |
| #define throwException(ex) { LOGFINEST("%s: %s", ex.getName(), ex.getMessage()); throw ex; } |
| bool TcrConnection::InitTcrConnection(TcrEndpoint* endpointObj, const char* endpoint, |
| Set< uint16_t >& ports, |
| bool isClientNotification, bool isSecondary, uint32_t connectTimeout) |
| { |
| m_conn = NULL; |
| m_endpointObj = endpointObj; |
| m_poolDM = dynamic_cast<ThinClientPoolDM*> (m_endpointObj->getPoolHADM()); |
| // add to the connection reference counter of the endpoint |
| m_endpointObj->addConnRefCounter(1); |
| //m_connected = isConnected; |
| m_hasServerQueue = NON_REDUNDANT_SERVER; |
| m_queueSize = 0; |
| m_dh = NULL; |
| //m_chunksProcessSema = 0; |
| m_creationTime = ACE_OS::gettimeofday(); |
| connectionId = INITIAL_CONNECTION_ID; |
| m_lastAccessed = ACE_OS::gettimeofday(); |
| |
| LOGDEBUG("Tcrconnection const isSecondary = %d and isClientNotification = %d, this = %p, conn ref to endopint %d", isSecondary, isClientNotification, this, m_endpointObj->getConnRefCounter()); |
| bool isPool = false; |
| m_isBeingUsed = false; |
| GF_DEV_ASSERT(endpoint != NULL); |
| m_endpoint = endpoint; |
| // Precondition: |
| // 1. isSecondary ==> isClientNotification |
| |
| GF_DEV_ASSERT(!isSecondary || isClientNotification); |
| |
| DistributedSystemPtr dsys = DistributedSystem::getInstance(); |
| |
| // Create TcpConn object which manages a socket connection with the endpoint. |
| if( endpointObj && endpointObj->getPoolHADM()) |
| { |
| m_conn = createConnection(m_endpoint, connectTimeout, |
| static_cast <int32_t> (endpointObj->getPoolHADM()->getSocketBufferSize())); |
| isPool = true; |
| } |
| else |
| { |
| m_conn = createConnection(m_endpoint, connectTimeout, 0); |
| } |
| |
| GF_DEV_ASSERT(m_conn != NULL); |
| |
| DataOutput handShakeMsg; |
| bool isNotificationChannel = false; |
| // Send byte Acceptor.CLIENT_TO_SERVER = (byte) 100; |
| // Send byte Acceptor.SERVER_TO_CLIENT = (byte) 101; |
| if ( isClientNotification ) { |
| isNotificationChannel = true; |
| if ( isSecondary ) { |
| handShakeMsg.write( (int8_t)SECONDARY_SERVER_TO_CLIENT ); |
| } else { |
| handShakeMsg.write( (int8_t)PRIMARY_SERVER_TO_CLIENT ); |
| } |
| } else { |
| handShakeMsg.write( (int8_t)CLIENT_TO_SERVER ); |
| } |
| |
| // added for versioned client |
| int8_t versionOrdinal = Version::getOrdinal(); |
| handShakeMsg.write(versionOrdinal); |
| |
| LOGFINE("Client version ordinal is %d", versionOrdinal); |
| |
| handShakeMsg.write((int8_t)REPLY_OK); |
| |
| // Send byte REPLY_OK = (byte)58; |
| if (!isClientNotification) { |
| m_port = m_conn->getPort( ); |
| ports.insert( m_port ); |
| } else { |
| // add the local ports to message |
| Set< uint16_t >::Iterator iter = ports.iterator( ); |
| handShakeMsg.writeInt( (int32_t)ports.size( ) ); |
| while ( iter.hasNext( ) ) { |
| handShakeMsg.writeInt( (int32_t)iter.next( ) ); |
| } |
| } |
| |
| // ARB: Writing handshake readtimeout value for CSVER_51+. |
| if (!isClientNotification) { |
| // SW: The timeout has been artificially raised to the highest |
| // permissible value for bug #232 for now. |
| // VJR: minus 10 sec because the GFE 5.7 gridDev branch adds a |
| // 5 sec buffer which was causing an int overflow. |
| handShakeMsg.writeInt( (int32_t)0x7fffffff - 10000 ); |
| } |
| |
| |
| // Write header for byte FixedID since GFE 5.7 |
| handShakeMsg.write((int8_t)GemfireTypeIdsImpl::FixedIDByte); |
| //Writing byte for ClientProxyMembershipID class id=38 as registered on the java server. |
| handShakeMsg.write((int8_t)GemfireTypeIdsImpl::ClientProxyMembershipId); |
| if( endpointObj->getPoolHADM() ){ |
| ClientProxyMembershipID* memId = endpointObj->getPoolHADM()->getMembershipId(); |
| uint32_t memIdBufferLength; |
| const char *memIdBuffer = memId->getDSMemberId(memIdBufferLength); |
| handShakeMsg.writeBytes((int8_t *)memIdBuffer, memIdBufferLength); |
| } |
| else { |
| ACE_TCHAR hostName[256]; |
| ACE_OS::hostname( hostName, sizeof(hostName)-1); |
| |
| ACE_INET_Addr driver("",hostName,"tcp"); |
| uint32_t hostAddr = driver.get_ip_address(); |
| uint16_t hostPort = 0; |
| |
| //Add 3 durable Subcription properties to ClientProxyMembershipID |
| SystemProperties* sysProp = DistributedSystem::getSystemProperties(); |
| |
| const char *durableId = (sysProp != NULL)? sysProp->durableClientId() : NULL; |
| const uint32_t durableTimeOut = (sysProp != NULL)? sysProp->durableTimeout() :0; |
| |
| // Write ClientProxyMembershipID serialized object. |
| uint32_t memIdBufferLength; |
| ClientProxyMembershipID memId( hostName, hostAddr, hostPort, durableId, durableTimeOut ); |
| const char *memIdBuffer = memId.getDSMemberId(memIdBufferLength); |
| handShakeMsg.writeBytes((int8_t *)memIdBuffer, memIdBufferLength); |
| } |
| handShakeMsg.writeInt((int32_t)1); |
| |
| bool isDhOn = false; |
| bool requireServerAuth = false; |
| PropertiesPtr credentials; |
| CacheableBytesPtr serverChallenge; |
| |
| SystemProperties *tmpSystemProperties = DistributedSystem::getSystemProperties(); |
| |
| //Write overrides (just conflation for now) |
| handShakeMsg.write(getOverrides(tmpSystemProperties)); |
| |
| bool tmpIsSecurityOn = tmpSystemProperties->isSecurityOn(); |
| isDhOn = tmpSystemProperties->isDhOn() ; |
| |
| if (m_endpointObj) |
| { |
| tmpIsSecurityOn = tmpSystemProperties->isSecurityOn() || this->m_endpointObj->isMultiUserMode(); |
| CacheableStringPtr dhalgo = tmpSystemProperties->getSecurityProperties()->find("security-client-dhalgo"); |
| |
| LOGDEBUG("TcrConnection this->m_endpointObj->isMultiUserMode() = %d ", this->m_endpointObj->isMultiUserMode()); |
| if (this->m_endpointObj->isMultiUserMode()) |
| { |
| if(dhalgo != NULLPTR && dhalgo->length() >0) |
| isDhOn = true; |
| } |
| } |
| |
| LOGDEBUG("TcrConnection algo name %s tmpIsSecurityOn = %d isDhOn = %d isNotificationChannel = %d ", tmpSystemProperties->securityClientDhAlgo(), tmpIsSecurityOn, isDhOn, isNotificationChannel); |
| bool doIneedToSendCreds = true; |
| if (isNotificationChannel && m_endpointObj && this->m_endpointObj->isMultiUserMode()) |
| { |
| isDhOn = false; |
| tmpIsSecurityOn = false; |
| doIneedToSendCreds = false; |
| } |
| |
| if(isNotificationChannel && !doIneedToSendCreds) |
| { |
| handShakeMsg.write((uint8_t)SECURITY_MULTIUSER_NOTIFICATIONCHANNEL); |
| } |
| else if (isDhOn) { |
| m_dh = new DiffieHellman(); |
| m_dh->initDhKeys(tmpSystemProperties->getSecurityProperties()); |
| handShakeMsg.write((uint8_t)SECURITY_CREDENTIALS_DHENCRYPT); |
| } else if (tmpIsSecurityOn) { |
| handShakeMsg.write((uint8_t)SECURITY_CREDENTIALS_NORMAL); |
| } else { |
| handShakeMsg.write((uint8_t)SECURITY_CREDENTIALS_NONE); |
| } |
| |
| if (tmpIsSecurityOn) { |
| try { |
| LOGFINER("TcrConnection: about to invoke authloader"); |
| PropertiesPtr tmpSecurityProperties = tmpSystemProperties->getSecurityProperties(); |
| if (tmpSecurityProperties == NULLPTR) { |
| LOGWARN("TcrConnection: security properties not found."); |
| } |
| //AuthInitializePtr authInitialize = tmpSystemProperties->getAuthLoader(); |
| //HItesh:only for backward connection |
| if (isClientNotification) { |
| AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader(); |
| if (authInitialize != NULLPTR) { |
| LOGFINER("TcrConnection: acquired handle to authLoader, " |
| "invoking getCredentials"); |
| /* adongre |
| * CID 28898: Copy into fixed size buffer (STRING_OVERFLOW) |
| * You might overrun the 100 byte fixed-size string "tmpEndpoint" by copying "this->m_endpoint" without checking the length. |
| * Note: This defect has an elevated risk because the source argument is a parameter of the current function. |
| */ |
| //char tmpEndpoint[100] = { '\0' } ; |
| //strcpy(tmpEndpoint, m_endpoint); |
| PropertiesPtr tmpAuthIniSecurityProperties = |
| authInitialize->getCredentials( tmpSecurityProperties, /*tmpEndpoint*/ m_endpoint ); |
| LOGFINER("TcrConnection: after getCredentials "); |
| credentials = tmpAuthIniSecurityProperties; |
| } |
| } |
| |
| if (isDhOn) { |
| CacheableStringPtr ksPath = tmpSecurityProperties->find("security-client-kspath"); |
| requireServerAuth = (ksPath != NULLPTR && ksPath->length() > 0); |
| handShakeMsg.writeBoolean(requireServerAuth); |
| LOGFINE("HandShake: Server authentication using RSA signature %s required", requireServerAuth?"is":"not"); |
| |
| // Send the symmetric key algorithm name string |
| handShakeMsg.write((int8_t)GemfireTypeIds::CacheableString); |
| handShakeMsg.writeASCII(tmpSystemProperties->securityClientDhAlgo()); |
| |
| // Send the client's DH public key to the server |
| // CacheableBytesPtr dhPubKey = DiffieHellman::getPublicKey(); |
| CacheableBytesPtr dhPubKey = m_dh->getPublicKey(); |
| LOGDEBUG("DH pubkey send len is %d", dhPubKey->length()); |
| dhPubKey->toData(handShakeMsg); |
| |
| if (requireServerAuth) { |
| char serverChallengeBytes[64] = {0}; |
| RandGen getrand; |
| for (int pos = 0; pos < 64; pos++) { |
| serverChallengeBytes[pos] = getrand(255); |
| } |
| serverChallenge = CacheableBytes::create((const uint8_t*)serverChallengeBytes, 64); |
| serverChallenge->toData(handShakeMsg); |
| } |
| } else { // if isDhOn |
| if (isClientNotification)//hitesh:only for backward connection |
| credentials->toData(handShakeMsg); |
| } // else isDhOn |
| } |
| catch ( const AuthenticationRequiredException& ) { |
| LOGDEBUG("AuthenticationRequiredException got"); |
| throw; |
| } |
| catch ( const AuthenticationFailedException& ) { |
| LOGDEBUG("AuthenticationFailedException got"); |
| throw; |
| } |
| catch ( const Exception& ex ) { |
| LOGWARN( "TcrConnection: failed to acquire handle to authLoader: [%s] %s", |
| ex.getName( ), ex.getMessage( ) ); |
| throwException(AuthenticationFailedException("TcrConnection: failed " |
| "to load authInit library: ", ex.getMessage())); |
| } |
| } |
| |
| uint32_t msgLengh; |
| char *data = (char*)handShakeMsg.getBuffer(&msgLengh); |
| LOGFINE( "Attempting handshake with endpoint %s for %s%s connection", endpoint, |
| isClientNotification ? (isSecondary ? "secondary " : "primary ") : "", |
| isClientNotification ? "subscription" : "client"); |
| ConnErrType error = sendData( data, msgLengh, connectTimeout, false ); |
| |
| if( error == CONN_NOERR ) { |
| |
| CacheableBytesPtr acceptanceCode = readHandshakeData(1, connectTimeout); |
| |
| LOGDEBUG(" Handshake: Got Accept Code %d", acceptanceCode[0]); |
| /* adongre */ |
| if ( acceptanceCode[0] == REPLY_SSL_ENABLED && ! tmpSystemProperties->sslEnabled() ) { |
| LOGERROR("SSL is enabled on server, enable SSL in client as well"); |
| AuthenticationRequiredException ex("SSL is enabled on server, enable SSL in client as well"); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(ex); |
| } |
| |
| // if diffie-hellman based credential encryption is enabled |
| if (isDhOn && acceptanceCode[0] == REPLY_OK) { |
| |
| // read the server's DH public key |
| CacheableBytesPtr pubKeyBytes = readHandshakeByteArray(connectTimeout); |
| LOGDEBUG(" Handshake: Got pubKeySize %d", pubKeyBytes->length()); |
| |
| // set the server's public key on client's DH side |
| //DiffieHellman::setPublicKeyOther(pubKeyBytes); |
| m_dh->setPublicKeyOther(pubKeyBytes); |
| |
| // Note: SK Algo is set in DistributedSystem::connect() |
| //DiffieHellman::computeSharedSecret(); |
| m_dh->computeSharedSecret(); |
| |
| if (requireServerAuth) { |
| //Read Subject Name |
| CacheableStringPtr subjectName = readHandshakeString(connectTimeout); |
| LOGDEBUG("Got subject %s", subjectName->asChar()); |
| // read the server's signature bytes |
| CacheableBytesPtr responseBytes = readHandshakeByteArray(connectTimeout); |
| LOGDEBUG("Handshake: Got response size %d", responseBytes->length()); |
| LOGDEBUG("Handshake: Got serverChallenge size %d", serverChallenge->length()); |
| if (!m_dh->verify(subjectName, serverChallenge, responseBytes)) { |
| throwException(AuthenticationFailedException( |
| "Handshake: failed to verify server challenge response")); |
| } |
| LOGFINE("HandShake: Verified server challenge response"); |
| } |
| |
| // read the challenge bytes from the server |
| CacheableBytesPtr challengeBytes = readHandshakeByteArray(connectTimeout); |
| LOGDEBUG("Handshake: Got challengeSize %d", challengeBytes->length()); |
| |
| // encrypt the credentials and challenge bytes |
| DataOutput cleartext; |
| if (isClientNotification)//hitesh:only for backward connection |
| credentials->toData(cleartext); |
| challengeBytes->toData(cleartext); |
| CacheableBytesPtr ciphertext = m_dh->encrypt(cleartext.getBuffer(), cleartext.getBufferLength()); |
| |
| DataOutput sendCreds; |
| ciphertext->toData(sendCreds); |
| uint32_t credLen; |
| char * credData = (char*) sendCreds.getBuffer(&credLen); |
| // send the encrypted bytes and check the response |
| error = sendData(credData, credLen, connectTimeout, false); |
| |
| if (error == CONN_NOERR) { |
| acceptanceCode = readHandshakeData(1, connectTimeout); |
| LOGDEBUG("Handshake: Got acceptanceCode Finally %d", acceptanceCode[0]); |
| } else { |
| int32_t lastError = ACE_OS::last_error( ); |
| LOGERROR( "Handshake failed, errno: %d, server may not be running", lastError ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| if ( error & CONN_TIMEOUT ) { |
| throwException(TimeoutException("TcrConnection::TcrConnection: " |
| "connection timed out during diffie-hellman handshake")); |
| } else { |
| throwException(GemfireIOException("TcrConnection::TcrConnection: " |
| "Handshake failure during diffie-hellman")); |
| } |
| } |
| } |
| |
| CacheableBytesPtr serverQueueStatus = readHandshakeData(1, connectTimeout); |
| |
| // ARB: TESTING: Durable clients - set server queue status. |
| // 0 - Non-Redundant , 1- Redundant , 2- Primary |
| if (serverQueueStatus[0] == 1 ) { |
| m_hasServerQueue = REDUNDANT_SERVER; |
| } |
| else if (serverQueueStatus[0] == 2 ) { |
| m_hasServerQueue = PRIMARY_SERVER; |
| } |
| else { |
| m_hasServerQueue = NON_REDUNDANT_SERVER; |
| } |
| CacheableBytesPtr queueSizeMsg = readHandshakeData( 4, connectTimeout ); |
| DataInput dI(queueSizeMsg->value(), queueSizeMsg->length()); |
| int32_t queueSize = 0; |
| dI.readInt( &queueSize ); |
| m_queueSize = queueSize > 0 ? queueSize: 0; |
| |
| m_endpointObj->setServerQueueStatus( m_hasServerQueue,m_queueSize ); |
| |
| ////////////////////////// Set Pool Specific Q Size only when //////////////////////////////// |
| ////////////////////////// 1. SereverQStatus = Primary or //////////////////////////////// |
| ////////////////////////// 2. SereverQStatus = Non-Redundant and //////////////////////////////// |
| ////////////////////////// 3. Only when handshake is for subscription //////////////////////////////// |
| if (m_poolDM != NULL) { |
| if ((m_hasServerQueue == PRIMARY_SERVER || m_hasServerQueue == NON_REDUNDANT_SERVER) && isClientNotification) |
| { |
| m_poolDM->setPrimaryServerQueueSize(queueSize); |
| } |
| } |
| |
| if(!isClientNotification ) { |
| // Read and ignore the DistributedMember object |
| CacheableBytesPtr arrayLenHeader = readHandshakeData( 1, connectTimeout ); |
| int32_t recvMsgLen = (int32_t) arrayLenHeader[0]; |
| // now check for array length headers - since GFE 5.7 |
| if ((int8_t)arrayLenHeader[0] == -2) { |
| CacheableBytesPtr recvMsgLenBytes = readHandshakeData( 2, connectTimeout ); |
| DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length()); |
| int16_t recvMsgLenShort = 0; |
| dI2.readInt( &recvMsgLenShort ); |
| recvMsgLen = recvMsgLenShort; |
| } |
| else if ((int8_t)arrayLenHeader[0] == -3) { |
| CacheableBytesPtr recvMsgLenBytes = readHandshakeData( 4, connectTimeout ); |
| DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length()); |
| dI2.readInt( &recvMsgLen ); |
| } |
| CacheableBytesPtr recvMessage = readHandshakeData( recvMsgLen, connectTimeout ); |
| // If the distributed member has not been set yet, set it. |
| if (getEndpointObject()->getDistributedMemberID() == 0) |
| { |
| LOGDEBUG("Deserializing distributed member Id"); |
| DataInput diForClient(recvMessage->value(), recvMessage->length()); |
| ClientProxyMembershipIDPtr member; |
| diForClient.readObject(member); |
| uint16_t memId = CacheImpl::getMemberListForVersionStamp()->add((DSMemberForVersionStampPtr)member); |
| getEndpointObject()->setDistributedMemberID(memId); |
| LOGDEBUG("Deserialized distributed member Id %d", memId); |
| } |
| |
| } |
| |
| CacheableBytesPtr recvMsgLenBytes = readHandshakeData( 2, connectTimeout ); |
| DataInput dI3(recvMsgLenBytes->value(), recvMsgLenBytes->length()); |
| uint16_t recvMsgLen2 = 0; |
| dI3.readInt( &recvMsgLen2 ); |
| CacheableBytesPtr recvMessage = readHandshakeData( recvMsgLen2, connectTimeout ); |
| |
| if(!isClientNotification ) { |
| CacheableBytesPtr deltaEnabledMsg = readHandshakeData( 1, connectTimeout ); |
| DataInput di( deltaEnabledMsg->value( ), 1 ); |
| bool isDeltaEnabledOnServer; |
| di.readBoolean( &isDeltaEnabledOnServer ); |
| ThinClientBaseDM::setDeltaEnabledOnServer( isDeltaEnabledOnServer ); |
| } |
| |
| switch(acceptanceCode[0]) { |
| case REPLY_OK: |
| case SUCCESSFUL_SERVER_TO_CLIENT: |
| LOGFINER( "Handshake reply: %u,%u,%u", acceptanceCode[0], serverQueueStatus[0], recvMsgLen2 ); |
| if(isClientNotification) |
| readHandshakeInstantiatorMsg(connectTimeout); |
| break; |
| case REPLY_AUTHENTICATION_FAILED: { |
| AuthenticationFailedException ex( (char*)recvMessage->value( ) ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(ex); |
| // not expected to be reached |
| break; |
| } |
| case REPLY_AUTHENTICATION_REQUIRED: { |
| AuthenticationRequiredException ex( (char*)recvMessage->value( ) ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(ex); |
| // not expected to be reached |
| break; |
| } |
| case REPLY_DUPLICATE_DURABLE_CLIENT: { |
| DuplicateDurableClientException ex( (char*)recvMessage->value( ) ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(ex); |
| // not expected to be reached |
| break; |
| } |
| case REPLY_REFUSED: |
| case REPLY_INVALID: |
| case UNSUCCESSFUL_SERVER_TO_CLIENT: { |
| LOGERROR( "Handshake rejected by server[%s]: %s", |
| m_endpointObj->name( ).c_str( ), (char*)recvMessage->value( ) ); |
| CacheServerException ex( "TcrConnection::TcrConnection: " |
| "Handshake rejected by server: ", (char*)recvMessage->value( ) ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throw ex; |
| } |
| default: { |
| LOGERROR("Unknown error[%d] received from server [%s] in handshake: " |
| "%s", acceptanceCode[0], m_endpointObj->name().c_str(), |
| recvMessage->value()); |
| MessageException ex( "TcrConnection::TcrConnection: Unknown error" |
| " received from server in handshake: ", (char*)recvMessage->value( ) ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throw ex; |
| } |
| } |
| |
| } else { |
| int32_t lastError = ACE_OS::last_error( ); |
| LOGFINE( "Handshake failed, errno: %d: %s", |
| lastError,ACE_OS::strerror(lastError) ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| if ( error & CONN_TIMEOUT ) { |
| throw TimeoutException( "TcrConnection::TcrConnection: " |
| "connection timed out during handshake" ); |
| } else { |
| throw GemfireIOException( "TcrConnection::TcrConnection: " |
| "Handshake failure" ); |
| } |
| } |
| |
| //TODO:hitesh we can authenticate endpoint here if pool is not in multiuser mode. |
| //for backward connection we send credentials to server in handshake itself. |
| //for forward connection we need to send credentail to server |
| //---if pool in not in multiuser node |
| //---or old endpoint case. |
| |
| if (this->m_endpointObj && !isNotificationChannel && tmpIsSecurityOn && (!isPool || !this->m_endpointObj->isMultiUserMode())) |
| { |
| // this->m_endpointObj->authenticateEndpoint(this); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| Connector * |
| TcrConnection::createConnection(const char * endpoint, uint32_t connectTimeout, int32_t maxBuffSizePool) |
| { |
| Connector * socket = NULL; |
| if (DistributedSystem::getSystemProperties()->sslEnabled()) { |
| socket = new TcpSslConn(endpoint, connectTimeout, maxBuffSizePool); |
| } else { |
| socket = new TcpConn(endpoint, connectTimeout, maxBuffSizePool); |
| } |
| //as socket.init() calls throws exception... |
| m_conn = socket; |
| socket->init(); |
| return socket; |
| } |
| |
| /* The timeout behaviour for different methods is as follows: |
| * receive(): |
| * Header: default timeout |
| * Body: default timeout |
| * sendRequest()/sendRequestForChunkedResponse(): |
| * default timeout during send; for receive: |
| * Header: default timeout * default timeout retries to handle large payload |
| * if a timeout other than default timeout is specified then |
| * that is used instead |
| * Body: default timeout |
| */ |
| inline ConnErrType TcrConnection::receiveData( char* buffer, |
| int32_t length, uint32_t receiveTimeoutSec, bool checkConnected, bool isNotificationMessage, int32_t notPublicApiWithTimeout ) |
| { |
| GF_DEV_ASSERT(buffer != NULL); |
| GF_DEV_ASSERT(m_conn != NULL); |
| |
| bool isPublicApiTimeout = false; |
| //if gfcpp property unit set then sendTimeoutSec will be in millisecond |
| //otherwise it will be in second |
| if(DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) |
| { |
| LOGFINER("recieveData %d %d ", receiveTimeoutSec, notPublicApiWithTimeout); |
| if (notPublicApiWithTimeout == TcrMessage::QUERY || |
| notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS || |
| notPublicApiWithTimeout == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| /*notPublicApiWithTimeout == TcrMessage::GETDURABLECQS_MSG_TYPE || this is not public yet*/ |
| notPublicApiWithTimeout == TcrMessage::EXECUTE_FUNCTION || |
| notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION || |
| notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ) |
| { |
| //then app has set timeout in millis, change it to microSeconds |
| receiveTimeoutSec = receiveTimeoutSec * 1000; |
| isPublicApiTimeout = true; |
| LOGDEBUG("recieveData2 %d ", receiveTimeoutSec); |
| } |
| else |
| { |
| receiveTimeoutSec = receiveTimeoutSec * 1000 * 1000; |
| } |
| }else |
| {//it is set as seconds and change it to microsecond |
| receiveTimeoutSec = receiveTimeoutSec * 1000 * 1000; |
| } |
| |
| uint32_t defaultWaitSecs = isNotificationMessage ? 1*1000*1000 : 2*1000*1000; |
| //uint32_t defaultMicroSecs = (sendTimeoutSec % (1000*1000)) |
| if(defaultWaitSecs > receiveTimeoutSec) |
| defaultWaitSecs = receiveTimeoutSec; |
| |
| |
| int32_t startLen = length; |
| |
| while ( length > 0 && receiveTimeoutSec > 0 ) { |
| if ( checkConnected && !m_connected ) { |
| return CONN_IOERR; |
| } |
| if ( receiveTimeoutSec < defaultWaitSecs ) { |
| defaultWaitSecs = receiveTimeoutSec; |
| } |
| int32_t readBytes = m_conn->receive( buffer, length, defaultWaitSecs, 0 ); |
| int32_t lastError = ACE_OS::last_error( ); |
| length -= readBytes; |
| if( length > 0 && lastError != ETIME && lastError != ETIMEDOUT ) { |
| return CONN_IOERR; |
| } |
| buffer += readBytes; |
| /* |
| Update pools byteRecieved stat here. |
| readMessageChunked, readMessage, readHandshakeData, |
| readHandshakeRawData, readHandShakeBytes, readHandShakeInt, |
| readHandshakeString, all call TcrConnection::receiveData. |
| */ |
| LOGDEBUG("TcrConnection::receiveData length = %d defaultWaitSecs = %d" , length, defaultWaitSecs); |
| if (m_poolDM != NULL) { |
| LOGDEBUG("TcrConnection::receiveData readBytes = %d", readBytes); |
| m_poolDM->getStats().incReceivedBytes(static_cast<int64>(readBytes)); |
| } |
| receiveTimeoutSec -= defaultWaitSecs; |
| if((length == startLen) && isNotificationMessage) { //no data read |
| break; |
| } |
| } |
| // ARB: Postconditions for checking bounds. |
| GF_DEV_ASSERT( startLen >= length ); |
| GF_DEV_ASSERT( length >= 0 ); |
| return ( length == 0 ? CONN_NOERR : |
| ( length == startLen ? CONN_NODATA : CONN_TIMEOUT ) ); |
| } |
| |
| inline ConnErrType TcrConnection::sendData( const char* buffer, |
| int32_t length, uint32_t sendTimeoutSec, bool checkConnected ) |
| { |
| uint32_t dummy = 0; |
| return sendData(dummy, buffer, length, sendTimeoutSec, checkConnected); |
| } |
| |
| inline ConnErrType TcrConnection::sendData( uint32_t& timeSpent, const char* buffer, |
| int32_t length, uint32_t sendTimeoutSec, bool checkConnected, int32_t notPublicApiWithTimeout ) |
| { |
| GF_DEV_ASSERT(buffer != NULL); |
| GF_DEV_ASSERT(m_conn != NULL); |
| bool isPublicApiTimeout = false; |
| //if gfcpp property unit set then sendTimeoutSec will be in millisecond |
| //otherwise it will be in second |
| if(DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) |
| { |
| LOGFINER("sendData %d %d", sendTimeoutSec,notPublicApiWithTimeout); |
| if (notPublicApiWithTimeout == TcrMessage::QUERY || |
| notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS || |
| notPublicApiWithTimeout == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| /*notPublicApiWithTimeout == TcrMessage::GETDURABLECQS_MSG_TYPE || this is not public yet*/ |
| notPublicApiWithTimeout == TcrMessage::EXECUTE_FUNCTION || |
| notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION || |
| notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) |
| { |
| //then app has set timeout in millis, change it to microSeconds |
| sendTimeoutSec = sendTimeoutSec * 1000; |
| isPublicApiTimeout = true; |
| LOGDEBUG("sendData2 %d ", sendTimeoutSec); |
| } |
| else |
| { |
| sendTimeoutSec = sendTimeoutSec * 1000; |
| } |
| }else |
| {//it is set as seconds and change it to microsecond |
| sendTimeoutSec = sendTimeoutSec * 1000 * 1000; |
| } |
| |
| uint32_t defaultWaitSecs = 2*1000*1000; //2 second |
| //uint32_t defaultMicroSecs = (sendTimeoutSec % (1000*1000)) |
| if(defaultWaitSecs > sendTimeoutSec) |
| defaultWaitSecs = sendTimeoutSec; |
| LOGDEBUG("before send len %d sendTimeoutSec = %d checkConnected = %d m_connected %d", length, sendTimeoutSec, checkConnected, m_connected); |
| while ( length > 0 && sendTimeoutSec > 0 ) { |
| if ( checkConnected && !m_connected ) { |
| return CONN_IOERR; |
| } |
| LOGDEBUG("before send "); |
| if ( sendTimeoutSec < defaultWaitSecs ) { |
| defaultWaitSecs = sendTimeoutSec; |
| } |
| int32_t sentBytes = m_conn->send( buffer, length, defaultWaitSecs, 0 ); |
| |
| length -= sentBytes; |
| buffer += sentBytes; |
| //we don't want to decrement the remaining time for the last iteration |
| if( length == 0 ) { |
| break; |
| } |
| int32_t lastError = ACE_OS::last_error( ); |
| if( length > 0 && lastError != ETIME && lastError != ETIMEDOUT ) { |
| return CONN_IOERR; |
| } |
| |
| timeSpent += defaultWaitSecs; |
| sendTimeoutSec -= defaultWaitSecs; |
| } |
| if(isPublicApiTimeout) |
| {//it should go in millis |
| timeSpent = timeSpent /1000; |
| } |
| else |
| {//it should go in seconds |
| timeSpent = timeSpent / (1000 * 1000); |
| } |
| return ( length == 0 ? CONN_NOERR : CONN_TIMEOUT ); |
| } |
| |
| char* TcrConnection::sendRequest(const char* buffer, int32_t len, |
| size_t* recvLen, uint32_t sendTimeoutSec, uint32_t receiveTimeoutSec, int32 request) |
| { |
| LOGDEBUG("TcrConnection::sendRequest"); |
| uint32_t timeSpent = 0; |
| |
| send( timeSpent, buffer, len, sendTimeoutSec); |
| |
| if ( timeSpent >= receiveTimeoutSec ) |
| throwException(TimeoutException( |
| "TcrConnection::send: connection timed out")); |
| |
| receiveTimeoutSec -= timeSpent; |
| ConnErrType opErr = CONN_NOERR; |
| return readMessage(recvLen, receiveTimeoutSec, true, &opErr, false, request); |
| } |
| |
| void TcrConnection::sendRequestForChunkedResponse(const TcrMessage& request, |
| int32_t len, TcrMessage& reply, uint32_t sendTimeoutSec, |
| uint32_t receiveTimeoutSec) |
| { |
| int32_t msgType = request.getMessageType(); |
| //ACE_OS::memcpy(&msgType, buffer, 4); |
| //msgType = ntohl(msgType); |
| |
| /*receiveTimeoutSec = (msgType == TcrMessage::QUERY || |
| msgType == TcrMessage::QUERY_WITH_PARAMETERS || |
| msgType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| msgType == TcrMessage::GETDURABLECQS_MSG_TYPE || |
| msgType == TcrMessage::EXECUTE_FUNCTION || |
| msgType == TcrMessage::EXECUTE_REGION_FUNCTION || |
| msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) |
| ? reply.getTimeout() : receiveTimeoutSec; |
| |
| //send + recieve should be part of API timeout |
| sendTimeoutSec = (msgType == TcrMessage::QUERY || |
| msgType == TcrMessage::QUERY_WITH_PARAMETERS || |
| msgType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| msgType == TcrMessage::GETDURABLECQS_MSG_TYPE || |
| msgType == TcrMessage::EXECUTE_FUNCTION || |
| msgType == TcrMessage::EXECUTE_REGION_FUNCTION || |
| msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) |
| ? reply.getTimeout() : sendTimeoutSec; |
| */ |
| switch(msgType) |
| { |
| case TcrMessage::QUERY: |
| case TcrMessage::QUERY_WITH_PARAMETERS: |
| case TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE: |
| case TcrMessage::GETDURABLECQS_MSG_TYPE: |
| case TcrMessage::EXECUTE_FUNCTION: |
| case TcrMessage::EXECUTE_REGION_FUNCTION: |
| case TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP: |
| { |
| receiveTimeoutSec = reply.getTimeout(); |
| sendTimeoutSec = reply.getTimeout(); |
| break; |
| } |
| default: |
| break; |
| } |
| /*if((msgType == TcrMessage::QUERY || |
| msgType == TcrMessage::QUERY_WITH_PARAMETERS || |
| msgType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| msgType == TcrMessage::GETDURABLECQS_MSG_TYPE || |
| msgType == TcrMessage::EXECUTE_FUNCTION || |
| msgType == TcrMessage::EXECUTE_REGION_FUNCTION)) |
| { |
| receiveTimeoutSec = reply.getTimeout(); |
| sendTimeoutSec = reply.getTimeout(); |
| }*/ |
| |
| //send(buffer, len, sendTimeoutSec); |
| uint32_t timeSpent = 0; |
| send( timeSpent, request.getMsgData(), len, sendTimeoutSec, true, msgType); |
| |
| if ( timeSpent >= receiveTimeoutSec ) |
| throwException(TimeoutException( |
| "TcrConnection::send: connection timed out")); |
| |
| receiveTimeoutSec -= timeSpent; |
| |
| // to help in decoding the reply based on what was the request type |
| reply.setMessageTypeRequest(msgType); |
| //no need of it now, this will not come here |
| if(msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ) |
| { |
| ChunkedFunctionExecutionResponse* resultCollector = static_cast<ChunkedFunctionExecutionResponse*>(reply.getChunkedResultHandler()); |
| if(resultCollector->getResult()==false) |
| { |
| LOGDEBUG("TcrConnection::sendRequestForChunkedResponse: function execution, no response desired"); |
| return; |
| } |
| } |
| readMessageChunked(reply, |
| receiveTimeoutSec , true); |
| } |
| |
| void TcrConnection::send( const char* buffer, int len, |
| uint32_t sendTimeoutSec, bool checkConnected ) |
| { |
| uint32_t dummy = 0; |
| send(dummy, buffer, len, sendTimeoutSec, checkConnected); |
| } |
| |
| void TcrConnection::send( uint32_t& timeSpent, const char* buffer, int len, |
| uint32_t sendTimeoutSec, bool checkConnected,int32_t notPublicApiWithTimeout ) |
| { |
| GF_DEV_ASSERT( m_conn != NULL ); |
| |
| //LOGINFO("TcrConnection::send: [%p] sending request to endpoint %s;", |
| //: this, m_endpoint); |
| |
| LOGDEBUG("TcrConnection::send: [%p] sending request to endpoint %s; bytes: %s", |
| this, m_endpoint, Utils::convertBytesToString(buffer, len)->asChar()); |
| |
| ConnErrType error = sendData( timeSpent, buffer, len, sendTimeoutSec, checkConnected, notPublicApiWithTimeout ); |
| |
| LOGFINER("TcrConnection::send: completed send request to endpoint %s " |
| "with error: %d", m_endpoint, error); |
| |
| if ( error != CONN_NOERR ) { |
| if ( error == CONN_TIMEOUT ) { |
| throwException(TimeoutException( |
| "TcrConnection::send: connection timed out")); |
| } else { |
| throwException(GemfireIOException( |
| "TcrConnection::send: connection failure")); |
| } |
| } |
| } |
| |
| char* TcrConnection::receive(size_t* recvLen, ConnErrType* opErr,uint32_t receiveTimeoutSec) |
| { |
| GF_DEV_ASSERT( m_conn != NULL ); |
| |
| return readMessage(recvLen, receiveTimeoutSec, false, opErr, true); |
| } |
| |
| char* TcrConnection::readMessage(size_t* recvLen, uint32_t receiveTimeoutSec, |
| bool doHeaderTimeoutRetries, ConnErrType* opErr, bool isNotificationMessage, int32 request) |
| { |
| char msg_header[HEADER_LENGTH]; |
| int32_t msgType, msgLen; |
| ConnErrType error; |
| |
| uint32_t headerTimeout = receiveTimeoutSec; |
| if ( doHeaderTimeoutRetries && receiveTimeoutSec == DEFAULT_READ_TIMEOUT_SECS ) { |
| headerTimeout = DEFAULT_READ_TIMEOUT_SECS * DEFAULT_TIMEOUT_RETRIES; |
| } |
| |
| LOGDEBUG( "TcrConnection::readMessage: receiving reply from endpoint %s", |
| m_endpoint ); |
| |
| error = receiveData( msg_header, HEADER_LENGTH, headerTimeout, true, isNotificationMessage ); |
| LOGDEBUG("TcrConnection::readMessage after recieve data"); |
| if ( error != CONN_NOERR ) { |
| // [sumedh] the !isNotificationMessage ensures that notification channel |
| // gets the TimeoutException when no data was received and is ignored by |
| // notification channel; when data has been received then it throws |
| // GemfireIOException that causes the channel to close as required |
| if ( error == CONN_NODATA || |
| ( error == CONN_TIMEOUT && !isNotificationMessage ) ) { |
| if (isNotificationMessage) { |
| // fix #752 - do not throw periodic TimeoutException for subscription channels to avoid frequent stack trace processing. |
| return NULL; |
| } else { |
| throwException(TimeoutException("TcrConnection::readMessage: " |
| "connection timed out while receiving message header")); |
| } |
| } else { |
| if (isNotificationMessage) { |
| *opErr = CONN_IOERR; |
| return NULL; |
| } |
| throwException(GemfireIOException("TcrConnection::readMessage: " |
| "connection failure while receiving message header")); |
| } |
| } |
| |
| LOGDEBUG("TcrConnection::readMessage: received header from endpoint %s; " |
| "bytes: %s", m_endpoint, Utils::convertBytesToString( |
| msg_header, HEADER_LENGTH)->asChar()); |
| |
| DataInput input((uint8_t*)msg_header, HEADER_LENGTH); |
| input.readInt(&msgType); |
| input.readInt(&msgLen); |
| // ARB: check that message length is valid. |
| if (!(msgLen > 0) && request == TcrMessage::GET_CLIENT_PR_METADATA) { |
| char* fullMessage; |
| *recvLen = HEADER_LENGTH + msgLen; |
| GF_NEW( fullMessage, char[HEADER_LENGTH + msgLen] ); |
| ACE_OS::memcpy(fullMessage, msg_header, HEADER_LENGTH); |
| return fullMessage; |
| //exit(0); |
| } |
| //GF_DEV_ASSERT(msgLen > 0); |
| |
| // user has to delete this pointer |
| char* fullMessage; |
| *recvLen = HEADER_LENGTH + msgLen; |
| GF_NEW( fullMessage, char[HEADER_LENGTH + msgLen] ); |
| ACE_OS::memcpy(fullMessage, msg_header, HEADER_LENGTH); |
| |
| uint32_t mesgBodyTimeout = receiveTimeoutSec; |
| if ( isNotificationMessage ) { |
| mesgBodyTimeout = receiveTimeoutSec * DEFAULT_TIMEOUT_RETRIES; |
| } |
| error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout, true, isNotificationMessage); |
| if ( error != CONN_NOERR ) { |
| delete [] fullMessage; |
| // [sumedh] the !isNotificationMessage ensures that notification channel |
| // gets the GemfireIOException and not TimeoutException; |
| // this is required since header has already been read meaning there could |
| // be stale data on socket and so it should close the notification channel |
| // while TimeoutException is normally ignored by notification channel |
| if ( ( error & CONN_TIMEOUT ) && !isNotificationMessage ) { |
| throwException(TimeoutException("TcrConnection::readMessage: " |
| "connection timed out while receiving message body")); |
| } else { |
| if(isNotificationMessage) { |
| *opErr = CONN_IOERR; |
| return NULL; |
| } |
| throwException(GemfireIOException("TcrConnection::readMessage: " |
| "connection failure while receiving message body")); |
| } |
| } |
| |
| LOGDEBUG("TcrConnection::readMessage: received message body from " |
| "endpoint %s; bytes: %s", m_endpoint, Utils::convertBytesToString( |
| fullMessage + HEADER_LENGTH, msgLen)->asChar()); |
| |
| //This is the test case when msg type is GET_CLIENT_PR_METADATA and msgLen is 0. |
| /*if (request == TcrMessage::GET_CLIENT_PR_METADATA) { |
| LOGCONFIG("Amey request == TcrMessage::GET_CLIENT_PR_METADATA"); |
| char* fullMessage2; |
| *recvLen = HEADER_LENGTH; |
| GF_NEW( fullMessage2, char[HEADER_LENGTH ] ); |
| ACE_OS::memcpy(fullMessage2, msg_header, HEADER_LENGTH); |
| return fullMessage2; |
| }*/ |
| |
| return fullMessage; |
| } |
| |
| void TcrConnection::readMessageChunked(TcrMessage &reply, |
| uint32_t receiveTimeoutSec, bool doHeaderTimeoutRetries) |
| { |
| const int HDR_LEN = 5; |
| const int HDR_LEN_12 = 12; |
| uint8_t msg_header[HDR_LEN_12 + HDR_LEN]; |
| ConnErrType error; |
| |
| uint32_t headerTimeout = receiveTimeoutSec; |
| if (doHeaderTimeoutRetries && receiveTimeoutSec == DEFAULT_READ_TIMEOUT_SECS) { |
| headerTimeout = DEFAULT_READ_TIMEOUT_SECS * DEFAULT_TIMEOUT_RETRIES; |
| } |
| |
| LOGFINER("TcrConnection::readMessageChunked: receiving reply from " |
| "endpoint %s", m_endpoint); |
| |
| error = receiveData((char*)msg_header, HDR_LEN_12 + HDR_LEN, headerTimeout, true, false, reply.getMessageTypeRequest()); |
| if (error != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| throwException(TimeoutException("TcrConnection::readMessageChunked: " |
| "connection timed out while receiving message header")); |
| } |
| else { |
| throwException(GemfireIOException( "TcrConnection::readMessageChunked: " |
| "connection failure while receiving message header")); |
| } |
| } |
| |
| LOGDEBUG("TcrConnection::readMessageChunked: received header from " |
| "endpoint %s; bytes: %s", m_endpoint, Utils::convertBytesToString( |
| msg_header, HDR_LEN_12)->asChar()); |
| |
| DataInput input(msg_header, HDR_LEN_12); |
| int32_t msgType; |
| input.readInt(&msgType); |
| reply.setMessageType(msgType); |
| int32_t txId; |
| int32_t numOfParts; |
| input.readInt(&numOfParts); |
| LOGDEBUG("TcrConnection::readMessageChunked numberof parts = %d ", numOfParts); |
| //input.advanceCursor(4); |
| input.readInt(&txId); |
| reply.setTransId(txId); |
| |
| //bool isLastChunk = false; |
| uint8_t isLastChunk = 0x0; |
| |
| int chunkNum = 0; |
| |
| // Initialize the chunk processing |
| reply.startProcessChunk(m_chunksProcessSema); |
| |
| // [sumedh] indicate an end to chunk processing and wait for processing |
| // to end even if reading the chunks fails in middle |
| struct FinalizeProcessChunk |
| { |
| private: |
| TcrMessage& m_reply; |
| uint16_t m_endpointmemId; |
| public: |
| FinalizeProcessChunk(TcrMessage& reply, uint16_t endpointmemId) : m_reply(reply), m_endpointmemId(endpointmemId) {} |
| ~FinalizeProcessChunk() |
| { |
| // Enqueue a NULL chunk indicating a wait for processing to complete. |
| m_reply.processChunk(NULL,0, m_endpointmemId); |
| } |
| } endProcessChunk(reply, m_endpointObj->getDistributedMemberID()); |
| bool first = true; |
| do { |
| //uint8_t chunk_header[HDR_LEN]; |
| if(!first) |
| { |
| error = receiveData((char*)(msg_header + HDR_LEN_12), HDR_LEN, headerTimeout, true, false, reply.getMessageTypeRequest()); |
| if (error != CONN_NOERR) { |
| if (error & CONN_TIMEOUT) { |
| throwException(TimeoutException("TcrConnection::readMessageChunked: " |
| "connection timed out while receiving chunk header")); |
| } |
| else { |
| throwException(GemfireIOException("TcrConnection::readMessageChunked: " |
| "connection failure while receiving chunk header")); |
| } |
| } |
| } else |
| { |
| first = false; |
| } |
| ++chunkNum; |
| |
| LOGDEBUG("TcrConnection::readMessageChunked: received chunk header %d " |
| "from endpoint %s; bytes: %s", chunkNum, m_endpoint, |
| Utils::convertBytesToString((msg_header + HDR_LEN_12), HDR_LEN)->asChar()); |
| |
| DataInput inp((msg_header + HDR_LEN_12), HDR_LEN); |
| int32_t chunkLen; |
| inp.readInt(&chunkLen); |
| // ARB: check that chunk length is valid. |
| GF_DEV_ASSERT(chunkLen> 0); |
| //inp.readBoolean(&isLastChunk); |
| inp.read(&isLastChunk); |
| |
| uint8_t* chunk_body; |
| GF_NEW(chunk_body, uint8_t[chunkLen]); |
| error = receiveData((char*)chunk_body, chunkLen, receiveTimeoutSec, true, false, reply.getMessageTypeRequest()); |
| if (error != CONN_NOERR) { |
| delete [] chunk_body; |
| if (error & CONN_TIMEOUT) { |
| throwException(TimeoutException("TcrConnection::readMessageChunked: " |
| "connection timed out while receiving chunk body")); |
| } |
| else { |
| throwException(GemfireIOException("TcrConnection::readMessageChunked: " |
| "connection failure while receiving chunk body")); |
| } |
| } |
| |
| LOGDEBUG("TcrConnection::readMessageChunked: received chunk body %d " |
| "from endpoint %s; bytes: %s", chunkNum, m_endpoint, |
| Utils::convertBytesToString(chunk_body, chunkLen)->asChar()); |
| // Process the chunk; the actual processing is done by a separate thread |
| // ThinClientBaseDM::m_chunkProcessor. |
| |
| |
| reply.processChunk(chunk_body, chunkLen, m_endpointObj->getDistributedMemberID(), isLastChunk); |
| } |
| while (!(isLastChunk & 0x1)); |
| |
| LOGFINER("TcrConnection::readMessageChunked: read full reply " |
| "from endpoint %s", m_endpoint); |
| } |
| |
| void TcrConnection::close( ) |
| { |
| //If this is a short lived grid client, don't bother with this close ack message |
| if (DistributedSystem::getSystemProperties()->isGridClient()) { |
| return; |
| } |
| |
| TcrMessage* closeMsg = TcrMessage::getCloseConnMessage( ); |
| try { |
| //LOGINFO("TcrConnection::close DC = %d; netdown = %d endpoint %s", TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH, TcrConnectionManager::isNetDown, m_endpoint); |
| if( !TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH && !TcrConnectionManager::isNetDown ) { |
| send( closeMsg->getMsgData( ), closeMsg->getMsgLength( ), 2, false ); |
| } |
| } catch (Exception & e) { |
| LOGINFO("Close connection message failed with msg: %s", e.getMessage()); |
| } catch (...) { |
| LOGINFO("Close connection message failed"); |
| } |
| } |
| |
| /* adongre |
| * CID 28738: Unsigned compared against 0 (NO_EFFECT) |
| * This less-than-zero comparison of an unsigned value is never true. "msgLength < 0U". |
| * Changing the unint32_t to int32_t |
| */ |
| //CacheableBytesPtr TcrConnection::readHandshakeData(uint32_t msgLength, uint32_t connectTimeout ) |
| CacheableBytesPtr TcrConnection::readHandshakeData(int32_t msgLength, uint32_t connectTimeout ) |
| { |
| ConnErrType error = CONN_NOERR ; |
| if(msgLength < 0) { |
| msgLength = 0; |
| } |
| char* recvMessage; |
| GF_NEW(recvMessage, char[msgLength + 1]); |
| recvMessage[msgLength] = '\0'; |
| if(msgLength == 0) { |
| return CacheableBytes::createNoCopy( (uint8_t*)recvMessage, 1); |
| } |
| if ( ( error = receiveData( recvMessage, msgLength, connectTimeout, |
| false ) ) != CONN_NOERR ) { |
| if ( error & CONN_TIMEOUT ) { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(GemfireIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| } else { |
| return CacheableBytes::createNoCopy( (uint8_t*)recvMessage, msgLength + 1 ); |
| } |
| return NULLPTR; |
| } |
| |
| //read just the bytes without the trailing null terminator |
| /* adongre |
| * CID 28739: Unsigned compared against 0 (NO_EFFECT) |
| * change the input parameter from unint32_t to int32_t |
| * as the comparasion case is valid |
| */ |
| CacheableBytesPtr TcrConnection::readHandshakeRawData(int32_t msgLength, |
| uint32_t connectTimeout ) |
| { |
| ConnErrType error = CONN_NOERR ; |
| if(msgLength < 0) { |
| msgLength = 0; |
| } |
| if(msgLength == 0) { |
| return NULLPTR; |
| } |
| char* recvMessage; |
| GF_NEW(recvMessage, char[msgLength]); |
| if ( ( error = receiveData( recvMessage, msgLength, connectTimeout, |
| false ) ) != CONN_NOERR ) { |
| if ( error & CONN_TIMEOUT ) { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(GemfireIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| // not expected to be reached |
| return NULLPTR; |
| } else { |
| return CacheableBytes::createNoCopy( (uint8_t*)recvMessage, msgLength ); |
| } |
| } |
| |
| //read a byte array |
| CacheableBytesPtr TcrConnection::readHandshakeByteArray(uint32_t connectTimeout) |
| { |
| /*CacheableBytesPtr codeBytes = readHandshakeData(1, connectTimeout); |
| DataInput codeDI(codeBytes->value(), codeBytes->length()); |
| uint8_t code = 0; |
| codeDI.read(&code); |
| uint32_t arraySize = 0; |
| if (code == 0xFF) { |
| return NULLPTR; |
| } else { |
| int32_t tempLen = code; |
| if (tempLen > 252) { // 252 is java's ((byte)-4 && 0xFF) |
| if (code == 0xFE) { |
| CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout); |
| DataInput lenDI(lenBytes->value(), lenBytes->length()); |
| uint16_t val; |
| lenDI.readInt(&val); |
| tempLen = val; |
| } else if (code == 0xFD) { |
| CacheableBytesPtr lenBytes = readHandshakeData(4, connectTimeout); |
| DataInput lenDI(lenBytes->value(), lenBytes->length()); |
| uint32_t val; |
| lenDI.readInt(&val); |
| tempLen = val; |
| } else { |
| GF_SAFE_DELETE(m_conn); |
| throwException(IllegalStateException("unexpected array length code")); |
| } |
| } |
| arraySize = tempLen; |
| }*/ |
| |
| uint32_t arraySize = readHandshakeArraySize(connectTimeout); |
| return readHandshakeRawData(arraySize, connectTimeout); |
| } |
| |
| //read a byte array |
| uint32_t TcrConnection::readHandshakeArraySize(uint32_t connectTimeout) |
| { |
| CacheableBytesPtr codeBytes = readHandshakeData(1, connectTimeout); |
| DataInput codeDI(codeBytes->value(), codeBytes->length()); |
| uint8_t code = 0; |
| codeDI.read(&code); |
| uint32_t arraySize = 0; |
| if (code == 0xFF) { |
| return 0; |
| } else { |
| int32_t tempLen = code; |
| if (tempLen > 252) { // 252 is java's ((byte)-4 && 0xFF) |
| if (code == 0xFE) { |
| CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout); |
| DataInput lenDI(lenBytes->value(), lenBytes->length()); |
| uint16_t val; |
| lenDI.readInt(&val); |
| tempLen = val; |
| } else if (code == 0xFD) { |
| CacheableBytesPtr lenBytes = readHandshakeData(4, connectTimeout); |
| DataInput lenDI(lenBytes->value(), lenBytes->length()); |
| uint32_t val; |
| lenDI.readInt(&val); |
| tempLen = val; |
| } else { |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException(IllegalStateException("unexpected array length code")); |
| } |
| } |
| arraySize = tempLen; |
| } |
| |
| return arraySize; |
| } |
| |
| void TcrConnection::readHandshakeInstantiatorMsg(uint32_t connectTimeout) |
| { |
| int hashMapSize = readHandshakeArraySize(connectTimeout); |
| for(int i = 0; i < hashMapSize; i++) |
| { |
| readHandShakeBytes(6, connectTimeout);//reading integer and arraylist type |
| int aLen = readHandshakeArraySize(connectTimeout); |
| for(int j = 0; j < aLen; j++) |
| { |
| readHandshakeString(connectTimeout); |
| } |
| } |
| |
| hashMapSize = readHandshakeArraySize(connectTimeout); |
| for(int i = 0; i < hashMapSize; i++) |
| { |
| readHandShakeBytes(5, connectTimeout);//reading integer |
| readHandshakeString(connectTimeout); |
| } |
| |
| //added in 3.6 and 6.6 |
| int hashMapSize2 = readHandshakeArraySize(connectTimeout); |
| for(int i = 0; i < hashMapSize2; i++) |
| { |
| readHandShakeBytes(6, connectTimeout);//reading integer and arraylist type |
| int aLen = readHandshakeArraySize(connectTimeout); |
| for(int j = 0; j < aLen; j++) |
| { |
| readHandshakeString(connectTimeout); |
| } |
| } |
| } |
| void TcrConnection::readHandShakeBytes(int numberOfBytes, uint32_t connectTimeout) |
| { |
| ConnErrType error = CONN_NOERR; |
| uint8_t * recvMessage; |
| GF_NEW(recvMessage, uint8_t[numberOfBytes]); |
| |
| if ( ( error = receiveData( (char*)recvMessage, numberOfBytes, connectTimeout, |
| false ) ) != CONN_NOERR ) { |
| if ( error & CONN_TIMEOUT ) { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(GemfireIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| } |
| |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| } |
| |
| int32_t TcrConnection::readHandShakeInt(uint32_t connectTimeout) |
| { |
| ConnErrType error = CONN_NOERR; |
| uint8_t * recvMessage; |
| GF_NEW(recvMessage, uint8_t[4]); |
| |
| if ( ( error = receiveData( (char*)recvMessage, 4, connectTimeout, |
| false ) ) != CONN_NOERR ) { |
| if ( error & CONN_TIMEOUT ) { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake")); |
| } else { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| throwException(GemfireIOException("TcrConnection::TcrConnection: " |
| "Handshake failure")); |
| } |
| } |
| |
| DataInput di(recvMessage, 4); |
| int32_t val; |
| di.readInt(&val); |
| |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| |
| return val; |
| } |
| CacheableStringPtr TcrConnection::readHandshakeString(uint32_t connectTimeout) |
| { |
| ConnErrType error = CONN_NOERR ; |
| |
| char cstypeid; |
| if (receiveData(&cstypeid, 1, connectTimeout, false) != CONN_NOERR) { |
| GF_SAFE_DELETE_CON( m_conn ); |
| if ( error & CONN_TIMEOUT ) { |
| LOGFINE("Timeout receiving string typeid"); |
| throwException(TimeoutException( "TcrConnection::TcrConnection: " |
| "Timeout in handshake reading string type ID")); |
| } else { |
| LOGFINE("IO error receiving string typeid"); |
| throwException(GemfireIOException( "TcrConnection::TcrConnection: " |
| "Handshake failure reading string type ID")); |
| } |
| } |
| |
| LOGDEBUG("Received string typeid as %d", cstypeid); |
| |
| uint32_t length = 0; |
| switch ((int8_t)cstypeid) |
| { |
| case GemfireTypeIds::CacheableNullString: { |
| return NULLPTR; |
| break; |
| } |
| case GF_STRING: { |
| uint16_t shortLen = 0; |
| CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout); |
| DataInput lenDI(lenBytes->value(), lenBytes->length()); |
| lenDI.readInt(&shortLen); |
| length = shortLen; |
| break; |
| } |
| default: { |
| GF_SAFE_DELETE_CON(m_conn); |
| throwException(GemfireIOException("TcrConnection::TcrConnection: " |
| "Handshake failure: Unexpected string type ID")); |
| } |
| } |
| |
| LOGDEBUG(" Received string len %d", length); |
| |
| if (length == 0) { |
| return NULLPTR; |
| } |
| |
| char* recvMessage; |
| GF_NEW(recvMessage, char[length + 1]); |
| recvMessage[length] = '\0'; |
| |
| if ( ( error = receiveData( recvMessage, length, connectTimeout, |
| false ) ) != CONN_NOERR ) { |
| if ( error & CONN_TIMEOUT ) { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| LOGFINE("Timeout receiving string data"); |
| throwException(TimeoutException("TcrConnection::TcrConnection: " |
| "Timeout in handshake reading string bytes")); |
| } else { |
| GF_SAFE_DELETE_ARRAY(recvMessage); |
| GF_SAFE_DELETE_CON( m_conn ); |
| LOGFINE("IO error receiving string data"); |
| throwException(GemfireIOException("TcrConnection::TcrConnection: " |
| "Handshake failure reading string bytes")); |
| } |
| // not expected to be reached |
| return NULLPTR; |
| } else { |
| LOGDEBUG(" Received string data [%s]", recvMessage); |
| CacheableStringPtr retval = CacheableString::createNoCopy( |
| recvMessage, length); |
| return retval; |
| } |
| } |
| bool TcrConnection::hasExpired(int expiryTime) |
| { |
| if (expiryTime == -1) { |
| return false; |
| } |
| |
| ACE_Time_Value _expiryTime(expiryTime/1000, (expiryTime%1000)*1000); |
| |
| if ( ACE_OS::gettimeofday() - m_creationTime > _expiryTime ) { |
| return true; |
| } |
| else { |
| return false; |
| } |
| } |
| |
| bool TcrConnection::isIdle(int idleTime) |
| { |
| if (idleTime == -1) { |
| return false; |
| } |
| |
| ACE_Time_Value _idleTime(idleTime/1000, (idleTime%1000)*1000); |
| |
| if ( ACE_OS::gettimeofday() - m_lastAccessed > _idleTime ) { |
| return true; |
| } |
| else { |
| return false; |
| } |
| } |
| |
| void TcrConnection::touch() |
| { |
| m_lastAccessed = ACE_OS::gettimeofday(); |
| } |
| |
| ACE_Time_Value TcrConnection::getLastAccessed() |
| { |
| return m_lastAccessed; |
| } |
| |
| uint8_t TcrConnection::getOverrides(SystemProperties * props) |
| { |
| const char * conflate = props->conflateEvents(); |
| uint8_t conflateByte = 0; |
| if (conflate != NULL ) { |
| if ( ACE_OS::strcasecmp(conflate, "true") == 0 ) { |
| conflateByte = 1; |
| } else if ( ACE_OS::strcasecmp(conflate, "false") == 0 ) { |
| conflateByte = 2; |
| } |
| } |
| /* |
| const char * removeUnresponsive = props->removeUnresponsiveClientOverride(); |
| uint8_t removeByte = 0; |
| if (removeUnresponsive != NULL ) { |
| if ( ACE_OS::strcasecmp(removeUnresponsive, "true") == 0 ) { |
| removeByte = 1; |
| } else if ( ACE_OS::strcasecmp(removeUnresponsive, "false") == 0 ) { |
| removeByte = 2; |
| } |
| } |
| const char * notify = props->notifyBySubscriptionOverride(); |
| uint8_t notifyByte = 0; |
| if (notify != NULL ) { |
| if ( ACE_OS::strcasecmp(notify, "true") == 0 ) { |
| notifyByte = 1; |
| } else if ( ACE_OS::strcasecmp(notify, "false") == 0 ) { |
| notifyByte = 2; |
| } |
| } |
| return (((notifyByte << 2) | removeByte) << 2) | conflateByte; |
| */ |
| return conflateByte; |
| } |
| |
| void TcrConnection::updateCreationTime() |
| { |
| m_creationTime = ACE_OS::gettimeofday(); |
| touch(); |
| } |
| |
| TcrConnection::~TcrConnection() |
| { |
| LOGDEBUG("Tcrconnection destructor %p . conn ref to endopint %d", this, m_endpointObj->getConnRefCounter()); |
| m_endpointObj->addConnRefCounter(-1); |
| if ( m_conn != NULL ) { |
| LOGDEBUG("closing the connection"); |
| m_conn->close( ); |
| GF_SAFE_DELETE_CON( m_conn ); |
| } |
| |
| if (m_dh != NULL) { |
| m_dh->clearDhKeys(); |
| GF_SAFE_DELETE( m_dh ); |
| } |
| } |
| |
| bool TcrConnection::setAndGetBeingUsed(volatile bool isBeingUsed, bool forTransaction) |
| { |
| uint32_t currentValue = 0; |
| uint32_t retVal = m_isUsed; |
| |
| if(!forTransaction) |
| { |
| if(isBeingUsed) |
| { |
| if(m_isUsed == 1 || m_isUsed == 2) |
| return false; |
| retVal = HostAsm::atomicCompareAndExchange(m_isUsed, 1, currentValue); |
| if(retVal == currentValue) |
| return true; |
| return false; |
| } |
| else |
| { |
| m_isUsed = 0; |
| return true; |
| } |
| } |
| else |
| { |
| if(isBeingUsed) |
| { |
| if(m_isUsed == 1)//already used |
| return false; |
| if(m_isUsed == 2)//transaction thread has set, reused it |
| return true; |
| retVal = HostAsm::atomicCompareAndExchange(m_isUsed, 2/*for transaction*/, currentValue); |
| if(retVal == currentValue) |
| return true; |
| return false; |
| } |
| else |
| { |
| //m_isUsed = 0;//this will done by releasing the connection by transaction at the end of transaction |
| return true; |
| } |
| } |
| } |