blob: 34effb028fb39237fedee7ed886027498f724589 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#include "TcrConnection.hpp"
#include "../DistributedSystem.hpp"
#include "../SystemProperties.hpp"
#include "Connector.hpp"
#include "TcpSslConn.hpp"
#include "ClientProxyMembershipID.hpp"
#include "ThinClientPoolHADM.hpp"
#include <memory.h>
#include <ace/INET_Addr.h>
#include <ace/OS.h>
#include "TcrEndpoint.hpp"
#include "GemfireTypeIdsImpl.hpp"
#include "TcrConnectionManager.hpp"
#include "DistributedSystemImpl.hpp"
#include "Version.hpp"
#include "DiffieHellman.hpp"
#include "Utils.hpp" // for RandGen for server challenge
#include "ThinClientRegion.hpp"
using namespace gemfire;
const int HEADER_LENGTH = 17;
const int MAXBUFSIZE = 65536;
const int BODYLENPOS = 4;
const int64_t INITIAL_CONNECTION_ID = 26739;
#define throwException(ex) { LOGFINEST("%s: %s", ex.getName(), ex.getMessage()); throw ex; }
bool TcrConnection::InitTcrConnection(TcrEndpoint* endpointObj, const char* endpoint,
Set< uint16_t >& ports,
bool isClientNotification, bool isSecondary, uint32_t connectTimeout)
{
m_conn = NULL;
m_endpointObj = endpointObj;
m_poolDM = dynamic_cast<ThinClientPoolDM*> (m_endpointObj->getPoolHADM());
// add to the connection reference counter of the endpoint
m_endpointObj->addConnRefCounter(1);
//m_connected = isConnected;
m_hasServerQueue = NON_REDUNDANT_SERVER;
m_queueSize = 0;
m_dh = NULL;
//m_chunksProcessSema = 0;
m_creationTime = ACE_OS::gettimeofday();
connectionId = INITIAL_CONNECTION_ID;
m_lastAccessed = ACE_OS::gettimeofday();
LOGDEBUG("Tcrconnection const isSecondary = %d and isClientNotification = %d, this = %p, conn ref to endopint %d", isSecondary, isClientNotification, this, m_endpointObj->getConnRefCounter());
bool isPool = false;
m_isBeingUsed = false;
GF_DEV_ASSERT(endpoint != NULL);
m_endpoint = endpoint;
// Precondition:
// 1. isSecondary ==> isClientNotification
GF_DEV_ASSERT(!isSecondary || isClientNotification);
DistributedSystemPtr dsys = DistributedSystem::getInstance();
// Create TcpConn object which manages a socket connection with the endpoint.
if( endpointObj && endpointObj->getPoolHADM())
{
m_conn = createConnection(m_endpoint, connectTimeout,
static_cast <int32_t> (endpointObj->getPoolHADM()->getSocketBufferSize()));
isPool = true;
}
else
{
m_conn = createConnection(m_endpoint, connectTimeout, 0);
}
GF_DEV_ASSERT(m_conn != NULL);
DataOutput handShakeMsg;
bool isNotificationChannel = false;
// Send byte Acceptor.CLIENT_TO_SERVER = (byte) 100;
// Send byte Acceptor.SERVER_TO_CLIENT = (byte) 101;
if ( isClientNotification ) {
isNotificationChannel = true;
if ( isSecondary ) {
handShakeMsg.write( (int8_t)SECONDARY_SERVER_TO_CLIENT );
} else {
handShakeMsg.write( (int8_t)PRIMARY_SERVER_TO_CLIENT );
}
} else {
handShakeMsg.write( (int8_t)CLIENT_TO_SERVER );
}
// added for versioned client
int8_t versionOrdinal = Version::getOrdinal();
handShakeMsg.write(versionOrdinal);
LOGFINE("Client version ordinal is %d", versionOrdinal);
handShakeMsg.write((int8_t)REPLY_OK);
// Send byte REPLY_OK = (byte)58;
if (!isClientNotification) {
m_port = m_conn->getPort( );
ports.insert( m_port );
} else {
// add the local ports to message
Set< uint16_t >::Iterator iter = ports.iterator( );
handShakeMsg.writeInt( (int32_t)ports.size( ) );
while ( iter.hasNext( ) ) {
handShakeMsg.writeInt( (int32_t)iter.next( ) );
}
}
// ARB: Writing handshake readtimeout value for CSVER_51+.
if (!isClientNotification) {
// SW: The timeout has been artificially raised to the highest
// permissible value for bug #232 for now.
// VJR: minus 10 sec because the GFE 5.7 gridDev branch adds a
// 5 sec buffer which was causing an int overflow.
handShakeMsg.writeInt( (int32_t)0x7fffffff - 10000 );
}
// Write header for byte FixedID since GFE 5.7
handShakeMsg.write((int8_t)GemfireTypeIdsImpl::FixedIDByte);
//Writing byte for ClientProxyMembershipID class id=38 as registered on the java server.
handShakeMsg.write((int8_t)GemfireTypeIdsImpl::ClientProxyMembershipId);
if( endpointObj->getPoolHADM() ){
ClientProxyMembershipID* memId = endpointObj->getPoolHADM()->getMembershipId();
uint32_t memIdBufferLength;
const char *memIdBuffer = memId->getDSMemberId(memIdBufferLength);
handShakeMsg.writeBytes((int8_t *)memIdBuffer, memIdBufferLength);
}
else {
ACE_TCHAR hostName[256];
ACE_OS::hostname( hostName, sizeof(hostName)-1);
ACE_INET_Addr driver("",hostName,"tcp");
uint32_t hostAddr = driver.get_ip_address();
uint16_t hostPort = 0;
//Add 3 durable Subcription properties to ClientProxyMembershipID
SystemProperties* sysProp = DistributedSystem::getSystemProperties();
const char *durableId = (sysProp != NULL)? sysProp->durableClientId() : NULL;
const uint32_t durableTimeOut = (sysProp != NULL)? sysProp->durableTimeout() :0;
// Write ClientProxyMembershipID serialized object.
uint32_t memIdBufferLength;
ClientProxyMembershipID memId( hostName, hostAddr, hostPort, durableId, durableTimeOut );
const char *memIdBuffer = memId.getDSMemberId(memIdBufferLength);
handShakeMsg.writeBytes((int8_t *)memIdBuffer, memIdBufferLength);
}
handShakeMsg.writeInt((int32_t)1);
bool isDhOn = false;
bool requireServerAuth = false;
PropertiesPtr credentials;
CacheableBytesPtr serverChallenge;
SystemProperties *tmpSystemProperties = DistributedSystem::getSystemProperties();
//Write overrides (just conflation for now)
handShakeMsg.write(getOverrides(tmpSystemProperties));
bool tmpIsSecurityOn = tmpSystemProperties->isSecurityOn();
isDhOn = tmpSystemProperties->isDhOn() ;
if (m_endpointObj)
{
tmpIsSecurityOn = tmpSystemProperties->isSecurityOn() || this->m_endpointObj->isMultiUserMode();
CacheableStringPtr dhalgo = tmpSystemProperties->getSecurityProperties()->find("security-client-dhalgo");
LOGDEBUG("TcrConnection this->m_endpointObj->isMultiUserMode() = %d ", this->m_endpointObj->isMultiUserMode());
if (this->m_endpointObj->isMultiUserMode())
{
if(dhalgo != NULLPTR && dhalgo->length() >0)
isDhOn = true;
}
}
LOGDEBUG("TcrConnection algo name %s tmpIsSecurityOn = %d isDhOn = %d isNotificationChannel = %d ", tmpSystemProperties->securityClientDhAlgo(), tmpIsSecurityOn, isDhOn, isNotificationChannel);
bool doIneedToSendCreds = true;
if (isNotificationChannel && m_endpointObj && this->m_endpointObj->isMultiUserMode())
{
isDhOn = false;
tmpIsSecurityOn = false;
doIneedToSendCreds = false;
}
if(isNotificationChannel && !doIneedToSendCreds)
{
handShakeMsg.write((uint8_t)SECURITY_MULTIUSER_NOTIFICATIONCHANNEL);
}
else if (isDhOn) {
m_dh = new DiffieHellman();
m_dh->initDhKeys(tmpSystemProperties->getSecurityProperties());
handShakeMsg.write((uint8_t)SECURITY_CREDENTIALS_DHENCRYPT);
} else if (tmpIsSecurityOn) {
handShakeMsg.write((uint8_t)SECURITY_CREDENTIALS_NORMAL);
} else {
handShakeMsg.write((uint8_t)SECURITY_CREDENTIALS_NONE);
}
if (tmpIsSecurityOn) {
try {
LOGFINER("TcrConnection: about to invoke authloader");
PropertiesPtr tmpSecurityProperties = tmpSystemProperties->getSecurityProperties();
if (tmpSecurityProperties == NULLPTR) {
LOGWARN("TcrConnection: security properties not found.");
}
//AuthInitializePtr authInitialize = tmpSystemProperties->getAuthLoader();
//HItesh:only for backward connection
if (isClientNotification) {
AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader();
if (authInitialize != NULLPTR) {
LOGFINER("TcrConnection: acquired handle to authLoader, "
"invoking getCredentials");
/* adongre
* CID 28898: Copy into fixed size buffer (STRING_OVERFLOW)
* You might overrun the 100 byte fixed-size string "tmpEndpoint" by copying "this->m_endpoint" without checking the length.
* Note: This defect has an elevated risk because the source argument is a parameter of the current function.
*/
//char tmpEndpoint[100] = { '\0' } ;
//strcpy(tmpEndpoint, m_endpoint);
PropertiesPtr tmpAuthIniSecurityProperties =
authInitialize->getCredentials( tmpSecurityProperties, /*tmpEndpoint*/ m_endpoint );
LOGFINER("TcrConnection: after getCredentials ");
credentials = tmpAuthIniSecurityProperties;
}
}
if (isDhOn) {
CacheableStringPtr ksPath = tmpSecurityProperties->find("security-client-kspath");
requireServerAuth = (ksPath != NULLPTR && ksPath->length() > 0);
handShakeMsg.writeBoolean(requireServerAuth);
LOGFINE("HandShake: Server authentication using RSA signature %s required", requireServerAuth?"is":"not");
// Send the symmetric key algorithm name string
handShakeMsg.write((int8_t)GemfireTypeIds::CacheableString);
handShakeMsg.writeASCII(tmpSystemProperties->securityClientDhAlgo());
// Send the client's DH public key to the server
// CacheableBytesPtr dhPubKey = DiffieHellman::getPublicKey();
CacheableBytesPtr dhPubKey = m_dh->getPublicKey();
LOGDEBUG("DH pubkey send len is %d", dhPubKey->length());
dhPubKey->toData(handShakeMsg);
if (requireServerAuth) {
char serverChallengeBytes[64] = {0};
RandGen getrand;
for (int pos = 0; pos < 64; pos++) {
serverChallengeBytes[pos] = getrand(255);
}
serverChallenge = CacheableBytes::create((const uint8_t*)serverChallengeBytes, 64);
serverChallenge->toData(handShakeMsg);
}
} else { // if isDhOn
if (isClientNotification)//hitesh:only for backward connection
credentials->toData(handShakeMsg);
} // else isDhOn
}
catch ( const AuthenticationRequiredException& ) {
LOGDEBUG("AuthenticationRequiredException got");
throw;
}
catch ( const AuthenticationFailedException& ) {
LOGDEBUG("AuthenticationFailedException got");
throw;
}
catch ( const Exception& ex ) {
LOGWARN( "TcrConnection: failed to acquire handle to authLoader: [%s] %s",
ex.getName( ), ex.getMessage( ) );
throwException(AuthenticationFailedException("TcrConnection: failed "
"to load authInit library: ", ex.getMessage()));
}
}
uint32_t msgLengh;
char *data = (char*)handShakeMsg.getBuffer(&msgLengh);
LOGFINE( "Attempting handshake with endpoint %s for %s%s connection", endpoint,
isClientNotification ? (isSecondary ? "secondary " : "primary ") : "",
isClientNotification ? "subscription" : "client");
ConnErrType error = sendData( data, msgLengh, connectTimeout, false );
if( error == CONN_NOERR ) {
CacheableBytesPtr acceptanceCode = readHandshakeData(1, connectTimeout);
LOGDEBUG(" Handshake: Got Accept Code %d", acceptanceCode[0]);
/* adongre */
if ( acceptanceCode[0] == REPLY_SSL_ENABLED && ! tmpSystemProperties->sslEnabled() ) {
LOGERROR("SSL is enabled on server, enable SSL in client as well");
AuthenticationRequiredException ex("SSL is enabled on server, enable SSL in client as well");
GF_SAFE_DELETE_CON( m_conn );
throwException(ex);
}
// if diffie-hellman based credential encryption is enabled
if (isDhOn && acceptanceCode[0] == REPLY_OK) {
// read the server's DH public key
CacheableBytesPtr pubKeyBytes = readHandshakeByteArray(connectTimeout);
LOGDEBUG(" Handshake: Got pubKeySize %d", pubKeyBytes->length());
// set the server's public key on client's DH side
//DiffieHellman::setPublicKeyOther(pubKeyBytes);
m_dh->setPublicKeyOther(pubKeyBytes);
// Note: SK Algo is set in DistributedSystem::connect()
//DiffieHellman::computeSharedSecret();
m_dh->computeSharedSecret();
if (requireServerAuth) {
//Read Subject Name
CacheableStringPtr subjectName = readHandshakeString(connectTimeout);
LOGDEBUG("Got subject %s", subjectName->asChar());
// read the server's signature bytes
CacheableBytesPtr responseBytes = readHandshakeByteArray(connectTimeout);
LOGDEBUG("Handshake: Got response size %d", responseBytes->length());
LOGDEBUG("Handshake: Got serverChallenge size %d", serverChallenge->length());
if (!m_dh->verify(subjectName, serverChallenge, responseBytes)) {
throwException(AuthenticationFailedException(
"Handshake: failed to verify server challenge response"));
}
LOGFINE("HandShake: Verified server challenge response");
}
// read the challenge bytes from the server
CacheableBytesPtr challengeBytes = readHandshakeByteArray(connectTimeout);
LOGDEBUG("Handshake: Got challengeSize %d", challengeBytes->length());
// encrypt the credentials and challenge bytes
DataOutput cleartext;
if (isClientNotification)//hitesh:only for backward connection
credentials->toData(cleartext);
challengeBytes->toData(cleartext);
CacheableBytesPtr ciphertext = m_dh->encrypt(cleartext.getBuffer(), cleartext.getBufferLength());
DataOutput sendCreds;
ciphertext->toData(sendCreds);
uint32_t credLen;
char * credData = (char*) sendCreds.getBuffer(&credLen);
// send the encrypted bytes and check the response
error = sendData(credData, credLen, connectTimeout, false);
if (error == CONN_NOERR) {
acceptanceCode = readHandshakeData(1, connectTimeout);
LOGDEBUG("Handshake: Got acceptanceCode Finally %d", acceptanceCode[0]);
} else {
int32_t lastError = ACE_OS::last_error( );
LOGERROR( "Handshake failed, errno: %d, server may not be running", lastError );
GF_SAFE_DELETE_CON( m_conn );
if ( error & CONN_TIMEOUT ) {
throwException(TimeoutException("TcrConnection::TcrConnection: "
"connection timed out during diffie-hellman handshake"));
} else {
throwException(GemfireIOException("TcrConnection::TcrConnection: "
"Handshake failure during diffie-hellman"));
}
}
}
CacheableBytesPtr serverQueueStatus = readHandshakeData(1, connectTimeout);
// ARB: TESTING: Durable clients - set server queue status.
// 0 - Non-Redundant , 1- Redundant , 2- Primary
if (serverQueueStatus[0] == 1 ) {
m_hasServerQueue = REDUNDANT_SERVER;
}
else if (serverQueueStatus[0] == 2 ) {
m_hasServerQueue = PRIMARY_SERVER;
}
else {
m_hasServerQueue = NON_REDUNDANT_SERVER;
}
CacheableBytesPtr queueSizeMsg = readHandshakeData( 4, connectTimeout );
DataInput dI(queueSizeMsg->value(), queueSizeMsg->length());
int32_t queueSize = 0;
dI.readInt( &queueSize );
m_queueSize = queueSize > 0 ? queueSize: 0;
m_endpointObj->setServerQueueStatus( m_hasServerQueue,m_queueSize );
////////////////////////// Set Pool Specific Q Size only when ////////////////////////////////
////////////////////////// 1. SereverQStatus = Primary or ////////////////////////////////
////////////////////////// 2. SereverQStatus = Non-Redundant and ////////////////////////////////
////////////////////////// 3. Only when handshake is for subscription ////////////////////////////////
if (m_poolDM != NULL) {
if ((m_hasServerQueue == PRIMARY_SERVER || m_hasServerQueue == NON_REDUNDANT_SERVER) && isClientNotification)
{
m_poolDM->setPrimaryServerQueueSize(queueSize);
}
}
if(!isClientNotification ) {
// Read and ignore the DistributedMember object
CacheableBytesPtr arrayLenHeader = readHandshakeData( 1, connectTimeout );
int32_t recvMsgLen = (int32_t) arrayLenHeader[0];
// now check for array length headers - since GFE 5.7
if ((int8_t)arrayLenHeader[0] == -2) {
CacheableBytesPtr recvMsgLenBytes = readHandshakeData( 2, connectTimeout );
DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length());
int16_t recvMsgLenShort = 0;
dI2.readInt( &recvMsgLenShort );
recvMsgLen = recvMsgLenShort;
}
else if ((int8_t)arrayLenHeader[0] == -3) {
CacheableBytesPtr recvMsgLenBytes = readHandshakeData( 4, connectTimeout );
DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length());
dI2.readInt( &recvMsgLen );
}
CacheableBytesPtr recvMessage = readHandshakeData( recvMsgLen, connectTimeout );
// If the distributed member has not been set yet, set it.
if (getEndpointObject()->getDistributedMemberID() == 0)
{
LOGDEBUG("Deserializing distributed member Id");
DataInput diForClient(recvMessage->value(), recvMessage->length());
ClientProxyMembershipIDPtr member;
diForClient.readObject(member);
uint16_t memId = CacheImpl::getMemberListForVersionStamp()->add((DSMemberForVersionStampPtr)member);
getEndpointObject()->setDistributedMemberID(memId);
LOGDEBUG("Deserialized distributed member Id %d", memId);
}
}
CacheableBytesPtr recvMsgLenBytes = readHandshakeData( 2, connectTimeout );
DataInput dI3(recvMsgLenBytes->value(), recvMsgLenBytes->length());
uint16_t recvMsgLen2 = 0;
dI3.readInt( &recvMsgLen2 );
CacheableBytesPtr recvMessage = readHandshakeData( recvMsgLen2, connectTimeout );
if(!isClientNotification ) {
CacheableBytesPtr deltaEnabledMsg = readHandshakeData( 1, connectTimeout );
DataInput di( deltaEnabledMsg->value( ), 1 );
bool isDeltaEnabledOnServer;
di.readBoolean( &isDeltaEnabledOnServer );
ThinClientBaseDM::setDeltaEnabledOnServer( isDeltaEnabledOnServer );
}
switch(acceptanceCode[0]) {
case REPLY_OK:
case SUCCESSFUL_SERVER_TO_CLIENT:
LOGFINER( "Handshake reply: %u,%u,%u", acceptanceCode[0], serverQueueStatus[0], recvMsgLen2 );
if(isClientNotification)
readHandshakeInstantiatorMsg(connectTimeout);
break;
case REPLY_AUTHENTICATION_FAILED: {
AuthenticationFailedException ex( (char*)recvMessage->value( ) );
GF_SAFE_DELETE_CON( m_conn );
throwException(ex);
// not expected to be reached
break;
}
case REPLY_AUTHENTICATION_REQUIRED: {
AuthenticationRequiredException ex( (char*)recvMessage->value( ) );
GF_SAFE_DELETE_CON( m_conn );
throwException(ex);
// not expected to be reached
break;
}
case REPLY_DUPLICATE_DURABLE_CLIENT: {
DuplicateDurableClientException ex( (char*)recvMessage->value( ) );
GF_SAFE_DELETE_CON( m_conn );
throwException(ex);
// not expected to be reached
break;
}
case REPLY_REFUSED:
case REPLY_INVALID:
case UNSUCCESSFUL_SERVER_TO_CLIENT: {
LOGERROR( "Handshake rejected by server[%s]: %s",
m_endpointObj->name( ).c_str( ), (char*)recvMessage->value( ) );
CacheServerException ex( "TcrConnection::TcrConnection: "
"Handshake rejected by server: ", (char*)recvMessage->value( ) );
GF_SAFE_DELETE_CON( m_conn );
throw ex;
}
default: {
LOGERROR("Unknown error[%d] received from server [%s] in handshake: "
"%s", acceptanceCode[0], m_endpointObj->name().c_str(),
recvMessage->value());
MessageException ex( "TcrConnection::TcrConnection: Unknown error"
" received from server in handshake: ", (char*)recvMessage->value( ) );
GF_SAFE_DELETE_CON( m_conn );
throw ex;
}
}
} else {
int32_t lastError = ACE_OS::last_error( );
LOGFINE( "Handshake failed, errno: %d: %s",
lastError,ACE_OS::strerror(lastError) );
GF_SAFE_DELETE_CON( m_conn );
if ( error & CONN_TIMEOUT ) {
throw TimeoutException( "TcrConnection::TcrConnection: "
"connection timed out during handshake" );
} else {
throw GemfireIOException( "TcrConnection::TcrConnection: "
"Handshake failure" );
}
}
//TODO:hitesh we can authenticate endpoint here if pool is not in multiuser mode.
//for backward connection we send credentials to server in handshake itself.
//for forward connection we need to send credentail to server
//---if pool in not in multiuser node
//---or old endpoint case.
if (this->m_endpointObj && !isNotificationChannel && tmpIsSecurityOn && (!isPool || !this->m_endpointObj->isMultiUserMode()))
{
// this->m_endpointObj->authenticateEndpoint(this);
return true;
}
return false;
}
Connector *
TcrConnection::createConnection(const char * endpoint, uint32_t connectTimeout, int32_t maxBuffSizePool)
{
Connector * socket = NULL;
if (DistributedSystem::getSystemProperties()->sslEnabled()) {
socket = new TcpSslConn(endpoint, connectTimeout, maxBuffSizePool);
} else {
socket = new TcpConn(endpoint, connectTimeout, maxBuffSizePool);
}
//as socket.init() calls throws exception...
m_conn = socket;
socket->init();
return socket;
}
/* The timeout behaviour for different methods is as follows:
* receive():
* Header: default timeout
* Body: default timeout
* sendRequest()/sendRequestForChunkedResponse():
* default timeout during send; for receive:
* Header: default timeout * default timeout retries to handle large payload
* if a timeout other than default timeout is specified then
* that is used instead
* Body: default timeout
*/
inline ConnErrType TcrConnection::receiveData( char* buffer,
int32_t length, uint32_t receiveTimeoutSec, bool checkConnected, bool isNotificationMessage, int32_t notPublicApiWithTimeout )
{
GF_DEV_ASSERT(buffer != NULL);
GF_DEV_ASSERT(m_conn != NULL);
bool isPublicApiTimeout = false;
//if gfcpp property unit set then sendTimeoutSec will be in millisecond
//otherwise it will be in second
if(DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())
{
LOGFINER("recieveData %d %d ", receiveTimeoutSec, notPublicApiWithTimeout);
if (notPublicApiWithTimeout == TcrMessage::QUERY ||
notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS ||
notPublicApiWithTimeout == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
/*notPublicApiWithTimeout == TcrMessage::GETDURABLECQS_MSG_TYPE || this is not public yet*/
notPublicApiWithTimeout == TcrMessage::EXECUTE_FUNCTION ||
notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION ||
notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP )
{
//then app has set timeout in millis, change it to microSeconds
receiveTimeoutSec = receiveTimeoutSec * 1000;
isPublicApiTimeout = true;
LOGDEBUG("recieveData2 %d ", receiveTimeoutSec);
}
else
{
receiveTimeoutSec = receiveTimeoutSec * 1000 * 1000;
}
}else
{//it is set as seconds and change it to microsecond
receiveTimeoutSec = receiveTimeoutSec * 1000 * 1000;
}
uint32_t defaultWaitSecs = isNotificationMessage ? 1*1000*1000 : 2*1000*1000;
//uint32_t defaultMicroSecs = (sendTimeoutSec % (1000*1000))
if(defaultWaitSecs > receiveTimeoutSec)
defaultWaitSecs = receiveTimeoutSec;
int32_t startLen = length;
while ( length > 0 && receiveTimeoutSec > 0 ) {
if ( checkConnected && !m_connected ) {
return CONN_IOERR;
}
if ( receiveTimeoutSec < defaultWaitSecs ) {
defaultWaitSecs = receiveTimeoutSec;
}
int32_t readBytes = m_conn->receive( buffer, length, defaultWaitSecs, 0 );
int32_t lastError = ACE_OS::last_error( );
length -= readBytes;
if( length > 0 && lastError != ETIME && lastError != ETIMEDOUT ) {
return CONN_IOERR;
}
buffer += readBytes;
/*
Update pools byteRecieved stat here.
readMessageChunked, readMessage, readHandshakeData,
readHandshakeRawData, readHandShakeBytes, readHandShakeInt,
readHandshakeString, all call TcrConnection::receiveData.
*/
LOGDEBUG("TcrConnection::receiveData length = %d defaultWaitSecs = %d" , length, defaultWaitSecs);
if (m_poolDM != NULL) {
LOGDEBUG("TcrConnection::receiveData readBytes = %d", readBytes);
m_poolDM->getStats().incReceivedBytes(static_cast<int64>(readBytes));
}
receiveTimeoutSec -= defaultWaitSecs;
if((length == startLen) && isNotificationMessage) { //no data read
break;
}
}
// ARB: Postconditions for checking bounds.
GF_DEV_ASSERT( startLen >= length );
GF_DEV_ASSERT( length >= 0 );
return ( length == 0 ? CONN_NOERR :
( length == startLen ? CONN_NODATA : CONN_TIMEOUT ) );
}
inline ConnErrType TcrConnection::sendData( const char* buffer,
int32_t length, uint32_t sendTimeoutSec, bool checkConnected )
{
uint32_t dummy = 0;
return sendData(dummy, buffer, length, sendTimeoutSec, checkConnected);
}
inline ConnErrType TcrConnection::sendData( uint32_t& timeSpent, const char* buffer,
int32_t length, uint32_t sendTimeoutSec, bool checkConnected, int32_t notPublicApiWithTimeout )
{
GF_DEV_ASSERT(buffer != NULL);
GF_DEV_ASSERT(m_conn != NULL);
bool isPublicApiTimeout = false;
//if gfcpp property unit set then sendTimeoutSec will be in millisecond
//otherwise it will be in second
if(DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())
{
LOGFINER("sendData %d %d", sendTimeoutSec,notPublicApiWithTimeout);
if (notPublicApiWithTimeout == TcrMessage::QUERY ||
notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS ||
notPublicApiWithTimeout == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
/*notPublicApiWithTimeout == TcrMessage::GETDURABLECQS_MSG_TYPE || this is not public yet*/
notPublicApiWithTimeout == TcrMessage::EXECUTE_FUNCTION ||
notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION ||
notPublicApiWithTimeout == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP)
{
//then app has set timeout in millis, change it to microSeconds
sendTimeoutSec = sendTimeoutSec * 1000;
isPublicApiTimeout = true;
LOGDEBUG("sendData2 %d ", sendTimeoutSec);
}
else
{
sendTimeoutSec = sendTimeoutSec * 1000;
}
}else
{//it is set as seconds and change it to microsecond
sendTimeoutSec = sendTimeoutSec * 1000 * 1000;
}
uint32_t defaultWaitSecs = 2*1000*1000; //2 second
//uint32_t defaultMicroSecs = (sendTimeoutSec % (1000*1000))
if(defaultWaitSecs > sendTimeoutSec)
defaultWaitSecs = sendTimeoutSec;
LOGDEBUG("before send len %d sendTimeoutSec = %d checkConnected = %d m_connected %d", length, sendTimeoutSec, checkConnected, m_connected);
while ( length > 0 && sendTimeoutSec > 0 ) {
if ( checkConnected && !m_connected ) {
return CONN_IOERR;
}
LOGDEBUG("before send ");
if ( sendTimeoutSec < defaultWaitSecs ) {
defaultWaitSecs = sendTimeoutSec;
}
int32_t sentBytes = m_conn->send( buffer, length, defaultWaitSecs, 0 );
length -= sentBytes;
buffer += sentBytes;
//we don't want to decrement the remaining time for the last iteration
if( length == 0 ) {
break;
}
int32_t lastError = ACE_OS::last_error( );
if( length > 0 && lastError != ETIME && lastError != ETIMEDOUT ) {
return CONN_IOERR;
}
timeSpent += defaultWaitSecs;
sendTimeoutSec -= defaultWaitSecs;
}
if(isPublicApiTimeout)
{//it should go in millis
timeSpent = timeSpent /1000;
}
else
{//it should go in seconds
timeSpent = timeSpent / (1000 * 1000);
}
return ( length == 0 ? CONN_NOERR : CONN_TIMEOUT );
}
char* TcrConnection::sendRequest(const char* buffer, int32_t len,
size_t* recvLen, uint32_t sendTimeoutSec, uint32_t receiveTimeoutSec, int32 request)
{
LOGDEBUG("TcrConnection::sendRequest");
uint32_t timeSpent = 0;
send( timeSpent, buffer, len, sendTimeoutSec);
if ( timeSpent >= receiveTimeoutSec )
throwException(TimeoutException(
"TcrConnection::send: connection timed out"));
receiveTimeoutSec -= timeSpent;
ConnErrType opErr = CONN_NOERR;
return readMessage(recvLen, receiveTimeoutSec, true, &opErr, false, request);
}
void TcrConnection::sendRequestForChunkedResponse(const TcrMessage& request,
int32_t len, TcrMessage& reply, uint32_t sendTimeoutSec,
uint32_t receiveTimeoutSec)
{
int32_t msgType = request.getMessageType();
//ACE_OS::memcpy(&msgType, buffer, 4);
//msgType = ntohl(msgType);
/*receiveTimeoutSec = (msgType == TcrMessage::QUERY ||
msgType == TcrMessage::QUERY_WITH_PARAMETERS ||
msgType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
msgType == TcrMessage::GETDURABLECQS_MSG_TYPE ||
msgType == TcrMessage::EXECUTE_FUNCTION ||
msgType == TcrMessage::EXECUTE_REGION_FUNCTION ||
msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP)
? reply.getTimeout() : receiveTimeoutSec;
//send + recieve should be part of API timeout
sendTimeoutSec = (msgType == TcrMessage::QUERY ||
msgType == TcrMessage::QUERY_WITH_PARAMETERS ||
msgType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
msgType == TcrMessage::GETDURABLECQS_MSG_TYPE ||
msgType == TcrMessage::EXECUTE_FUNCTION ||
msgType == TcrMessage::EXECUTE_REGION_FUNCTION ||
msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP)
? reply.getTimeout() : sendTimeoutSec;
*/
switch(msgType)
{
case TcrMessage::QUERY:
case TcrMessage::QUERY_WITH_PARAMETERS:
case TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE:
case TcrMessage::GETDURABLECQS_MSG_TYPE:
case TcrMessage::EXECUTE_FUNCTION:
case TcrMessage::EXECUTE_REGION_FUNCTION:
case TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP:
{
receiveTimeoutSec = reply.getTimeout();
sendTimeoutSec = reply.getTimeout();
break;
}
default:
break;
}
/*if((msgType == TcrMessage::QUERY ||
msgType == TcrMessage::QUERY_WITH_PARAMETERS ||
msgType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
msgType == TcrMessage::GETDURABLECQS_MSG_TYPE ||
msgType == TcrMessage::EXECUTE_FUNCTION ||
msgType == TcrMessage::EXECUTE_REGION_FUNCTION))
{
receiveTimeoutSec = reply.getTimeout();
sendTimeoutSec = reply.getTimeout();
}*/
//send(buffer, len, sendTimeoutSec);
uint32_t timeSpent = 0;
send( timeSpent, request.getMsgData(), len, sendTimeoutSec, true, msgType);
if ( timeSpent >= receiveTimeoutSec )
throwException(TimeoutException(
"TcrConnection::send: connection timed out"));
receiveTimeoutSec -= timeSpent;
// to help in decoding the reply based on what was the request type
reply.setMessageTypeRequest(msgType);
//no need of it now, this will not come here
if(msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP )
{
ChunkedFunctionExecutionResponse* resultCollector = static_cast<ChunkedFunctionExecutionResponse*>(reply.getChunkedResultHandler());
if(resultCollector->getResult()==false)
{
LOGDEBUG("TcrConnection::sendRequestForChunkedResponse: function execution, no response desired");
return;
}
}
readMessageChunked(reply,
receiveTimeoutSec , true);
}
void TcrConnection::send( const char* buffer, int len,
uint32_t sendTimeoutSec, bool checkConnected )
{
uint32_t dummy = 0;
send(dummy, buffer, len, sendTimeoutSec, checkConnected);
}
void TcrConnection::send( uint32_t& timeSpent, const char* buffer, int len,
uint32_t sendTimeoutSec, bool checkConnected,int32_t notPublicApiWithTimeout )
{
GF_DEV_ASSERT( m_conn != NULL );
//LOGINFO("TcrConnection::send: [%p] sending request to endpoint %s;",
//: this, m_endpoint);
LOGDEBUG("TcrConnection::send: [%p] sending request to endpoint %s; bytes: %s",
this, m_endpoint, Utils::convertBytesToString(buffer, len)->asChar());
ConnErrType error = sendData( timeSpent, buffer, len, sendTimeoutSec, checkConnected, notPublicApiWithTimeout );
LOGFINER("TcrConnection::send: completed send request to endpoint %s "
"with error: %d", m_endpoint, error);
if ( error != CONN_NOERR ) {
if ( error == CONN_TIMEOUT ) {
throwException(TimeoutException(
"TcrConnection::send: connection timed out"));
} else {
throwException(GemfireIOException(
"TcrConnection::send: connection failure"));
}
}
}
char* TcrConnection::receive(size_t* recvLen, ConnErrType* opErr,uint32_t receiveTimeoutSec)
{
GF_DEV_ASSERT( m_conn != NULL );
return readMessage(recvLen, receiveTimeoutSec, false, opErr, true);
}
char* TcrConnection::readMessage(size_t* recvLen, uint32_t receiveTimeoutSec,
bool doHeaderTimeoutRetries, ConnErrType* opErr, bool isNotificationMessage, int32 request)
{
char msg_header[HEADER_LENGTH];
int32_t msgType, msgLen;
ConnErrType error;
uint32_t headerTimeout = receiveTimeoutSec;
if ( doHeaderTimeoutRetries && receiveTimeoutSec == DEFAULT_READ_TIMEOUT_SECS ) {
headerTimeout = DEFAULT_READ_TIMEOUT_SECS * DEFAULT_TIMEOUT_RETRIES;
}
LOGDEBUG( "TcrConnection::readMessage: receiving reply from endpoint %s",
m_endpoint );
error = receiveData( msg_header, HEADER_LENGTH, headerTimeout, true, isNotificationMessage );
LOGDEBUG("TcrConnection::readMessage after recieve data");
if ( error != CONN_NOERR ) {
// [sumedh] the !isNotificationMessage ensures that notification channel
// gets the TimeoutException when no data was received and is ignored by
// notification channel; when data has been received then it throws
// GemfireIOException that causes the channel to close as required
if ( error == CONN_NODATA ||
( error == CONN_TIMEOUT && !isNotificationMessage ) ) {
if (isNotificationMessage) {
// fix #752 - do not throw periodic TimeoutException for subscription channels to avoid frequent stack trace processing.
return NULL;
} else {
throwException(TimeoutException("TcrConnection::readMessage: "
"connection timed out while receiving message header"));
}
} else {
if (isNotificationMessage) {
*opErr = CONN_IOERR;
return NULL;
}
throwException(GemfireIOException("TcrConnection::readMessage: "
"connection failure while receiving message header"));
}
}
LOGDEBUG("TcrConnection::readMessage: received header from endpoint %s; "
"bytes: %s", m_endpoint, Utils::convertBytesToString(
msg_header, HEADER_LENGTH)->asChar());
DataInput input((uint8_t*)msg_header, HEADER_LENGTH);
input.readInt(&msgType);
input.readInt(&msgLen);
// ARB: check that message length is valid.
if (!(msgLen > 0) && request == TcrMessage::GET_CLIENT_PR_METADATA) {
char* fullMessage;
*recvLen = HEADER_LENGTH + msgLen;
GF_NEW( fullMessage, char[HEADER_LENGTH + msgLen] );
ACE_OS::memcpy(fullMessage, msg_header, HEADER_LENGTH);
return fullMessage;
//exit(0);
}
//GF_DEV_ASSERT(msgLen > 0);
// user has to delete this pointer
char* fullMessage;
*recvLen = HEADER_LENGTH + msgLen;
GF_NEW( fullMessage, char[HEADER_LENGTH + msgLen] );
ACE_OS::memcpy(fullMessage, msg_header, HEADER_LENGTH);
uint32_t mesgBodyTimeout = receiveTimeoutSec;
if ( isNotificationMessage ) {
mesgBodyTimeout = receiveTimeoutSec * DEFAULT_TIMEOUT_RETRIES;
}
error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout, true, isNotificationMessage);
if ( error != CONN_NOERR ) {
delete [] fullMessage;
// [sumedh] the !isNotificationMessage ensures that notification channel
// gets the GemfireIOException and not TimeoutException;
// this is required since header has already been read meaning there could
// be stale data on socket and so it should close the notification channel
// while TimeoutException is normally ignored by notification channel
if ( ( error & CONN_TIMEOUT ) && !isNotificationMessage ) {
throwException(TimeoutException("TcrConnection::readMessage: "
"connection timed out while receiving message body"));
} else {
if(isNotificationMessage) {
*opErr = CONN_IOERR;
return NULL;
}
throwException(GemfireIOException("TcrConnection::readMessage: "
"connection failure while receiving message body"));
}
}
LOGDEBUG("TcrConnection::readMessage: received message body from "
"endpoint %s; bytes: %s", m_endpoint, Utils::convertBytesToString(
fullMessage + HEADER_LENGTH, msgLen)->asChar());
//This is the test case when msg type is GET_CLIENT_PR_METADATA and msgLen is 0.
/*if (request == TcrMessage::GET_CLIENT_PR_METADATA) {
LOGCONFIG("Amey request == TcrMessage::GET_CLIENT_PR_METADATA");
char* fullMessage2;
*recvLen = HEADER_LENGTH;
GF_NEW( fullMessage2, char[HEADER_LENGTH ] );
ACE_OS::memcpy(fullMessage2, msg_header, HEADER_LENGTH);
return fullMessage2;
}*/
return fullMessage;
}
void TcrConnection::readMessageChunked(TcrMessage &reply,
uint32_t receiveTimeoutSec, bool doHeaderTimeoutRetries)
{
const int HDR_LEN = 5;
const int HDR_LEN_12 = 12;
uint8_t msg_header[HDR_LEN_12 + HDR_LEN];
ConnErrType error;
uint32_t headerTimeout = receiveTimeoutSec;
if (doHeaderTimeoutRetries && receiveTimeoutSec == DEFAULT_READ_TIMEOUT_SECS) {
headerTimeout = DEFAULT_READ_TIMEOUT_SECS * DEFAULT_TIMEOUT_RETRIES;
}
LOGFINER("TcrConnection::readMessageChunked: receiving reply from "
"endpoint %s", m_endpoint);
error = receiveData((char*)msg_header, HDR_LEN_12 + HDR_LEN, headerTimeout, true, false, reply.getMessageTypeRequest());
if (error != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
throwException(TimeoutException("TcrConnection::readMessageChunked: "
"connection timed out while receiving message header"));
}
else {
throwException(GemfireIOException( "TcrConnection::readMessageChunked: "
"connection failure while receiving message header"));
}
}
LOGDEBUG("TcrConnection::readMessageChunked: received header from "
"endpoint %s; bytes: %s", m_endpoint, Utils::convertBytesToString(
msg_header, HDR_LEN_12)->asChar());
DataInput input(msg_header, HDR_LEN_12);
int32_t msgType;
input.readInt(&msgType);
reply.setMessageType(msgType);
int32_t txId;
int32_t numOfParts;
input.readInt(&numOfParts);
LOGDEBUG("TcrConnection::readMessageChunked numberof parts = %d ", numOfParts);
//input.advanceCursor(4);
input.readInt(&txId);
reply.setTransId(txId);
//bool isLastChunk = false;
uint8_t isLastChunk = 0x0;
int chunkNum = 0;
// Initialize the chunk processing
reply.startProcessChunk(m_chunksProcessSema);
// [sumedh] indicate an end to chunk processing and wait for processing
// to end even if reading the chunks fails in middle
struct FinalizeProcessChunk
{
private:
TcrMessage& m_reply;
uint16_t m_endpointmemId;
public:
FinalizeProcessChunk(TcrMessage& reply, uint16_t endpointmemId) : m_reply(reply), m_endpointmemId(endpointmemId) {}
~FinalizeProcessChunk()
{
// Enqueue a NULL chunk indicating a wait for processing to complete.
m_reply.processChunk(NULL,0, m_endpointmemId);
}
} endProcessChunk(reply, m_endpointObj->getDistributedMemberID());
bool first = true;
do {
//uint8_t chunk_header[HDR_LEN];
if(!first)
{
error = receiveData((char*)(msg_header + HDR_LEN_12), HDR_LEN, headerTimeout, true, false, reply.getMessageTypeRequest());
if (error != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
throwException(TimeoutException("TcrConnection::readMessageChunked: "
"connection timed out while receiving chunk header"));
}
else {
throwException(GemfireIOException("TcrConnection::readMessageChunked: "
"connection failure while receiving chunk header"));
}
}
} else
{
first = false;
}
++chunkNum;
LOGDEBUG("TcrConnection::readMessageChunked: received chunk header %d "
"from endpoint %s; bytes: %s", chunkNum, m_endpoint,
Utils::convertBytesToString((msg_header + HDR_LEN_12), HDR_LEN)->asChar());
DataInput inp((msg_header + HDR_LEN_12), HDR_LEN);
int32_t chunkLen;
inp.readInt(&chunkLen);
// ARB: check that chunk length is valid.
GF_DEV_ASSERT(chunkLen> 0);
//inp.readBoolean(&isLastChunk);
inp.read(&isLastChunk);
uint8_t* chunk_body;
GF_NEW(chunk_body, uint8_t[chunkLen]);
error = receiveData((char*)chunk_body, chunkLen, receiveTimeoutSec, true, false, reply.getMessageTypeRequest());
if (error != CONN_NOERR) {
delete [] chunk_body;
if (error & CONN_TIMEOUT) {
throwException(TimeoutException("TcrConnection::readMessageChunked: "
"connection timed out while receiving chunk body"));
}
else {
throwException(GemfireIOException("TcrConnection::readMessageChunked: "
"connection failure while receiving chunk body"));
}
}
LOGDEBUG("TcrConnection::readMessageChunked: received chunk body %d "
"from endpoint %s; bytes: %s", chunkNum, m_endpoint,
Utils::convertBytesToString(chunk_body, chunkLen)->asChar());
// Process the chunk; the actual processing is done by a separate thread
// ThinClientBaseDM::m_chunkProcessor.
reply.processChunk(chunk_body, chunkLen, m_endpointObj->getDistributedMemberID(), isLastChunk);
}
while (!(isLastChunk & 0x1));
LOGFINER("TcrConnection::readMessageChunked: read full reply "
"from endpoint %s", m_endpoint);
}
void TcrConnection::close( )
{
//If this is a short lived grid client, don't bother with this close ack message
if (DistributedSystem::getSystemProperties()->isGridClient()) {
return;
}
TcrMessage* closeMsg = TcrMessage::getCloseConnMessage( );
try {
//LOGINFO("TcrConnection::close DC = %d; netdown = %d endpoint %s", TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH, TcrConnectionManager::isNetDown, m_endpoint);
if( !TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH && !TcrConnectionManager::isNetDown ) {
send( closeMsg->getMsgData( ), closeMsg->getMsgLength( ), 2, false );
}
} catch (Exception & e) {
LOGINFO("Close connection message failed with msg: %s", e.getMessage());
} catch (...) {
LOGINFO("Close connection message failed");
}
}
/* adongre
* CID 28738: Unsigned compared against 0 (NO_EFFECT)
* This less-than-zero comparison of an unsigned value is never true. "msgLength < 0U".
* Changing the unint32_t to int32_t
*/
//CacheableBytesPtr TcrConnection::readHandshakeData(uint32_t msgLength, uint32_t connectTimeout )
CacheableBytesPtr TcrConnection::readHandshakeData(int32_t msgLength, uint32_t connectTimeout )
{
ConnErrType error = CONN_NOERR ;
if(msgLength < 0) {
msgLength = 0;
}
char* recvMessage;
GF_NEW(recvMessage, char[msgLength + 1]);
recvMessage[msgLength] = '\0';
if(msgLength == 0) {
return CacheableBytes::createNoCopy( (uint8_t*)recvMessage, 1);
}
if ( ( error = receiveData( recvMessage, msgLength, connectTimeout,
false ) ) != CONN_NOERR ) {
if ( error & CONN_TIMEOUT ) {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake"));
} else {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(GemfireIOException("TcrConnection::TcrConnection: "
"Handshake failure"));
}
} else {
return CacheableBytes::createNoCopy( (uint8_t*)recvMessage, msgLength + 1 );
}
return NULLPTR;
}
//read just the bytes without the trailing null terminator
/* adongre
* CID 28739: Unsigned compared against 0 (NO_EFFECT)
* change the input parameter from unint32_t to int32_t
* as the comparasion case is valid
*/
CacheableBytesPtr TcrConnection::readHandshakeRawData(int32_t msgLength,
uint32_t connectTimeout )
{
ConnErrType error = CONN_NOERR ;
if(msgLength < 0) {
msgLength = 0;
}
if(msgLength == 0) {
return NULLPTR;
}
char* recvMessage;
GF_NEW(recvMessage, char[msgLength]);
if ( ( error = receiveData( recvMessage, msgLength, connectTimeout,
false ) ) != CONN_NOERR ) {
if ( error & CONN_TIMEOUT ) {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake"));
} else {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(GemfireIOException("TcrConnection::TcrConnection: "
"Handshake failure"));
}
// not expected to be reached
return NULLPTR;
} else {
return CacheableBytes::createNoCopy( (uint8_t*)recvMessage, msgLength );
}
}
//read a byte array
CacheableBytesPtr TcrConnection::readHandshakeByteArray(uint32_t connectTimeout)
{
/*CacheableBytesPtr codeBytes = readHandshakeData(1, connectTimeout);
DataInput codeDI(codeBytes->value(), codeBytes->length());
uint8_t code = 0;
codeDI.read(&code);
uint32_t arraySize = 0;
if (code == 0xFF) {
return NULLPTR;
} else {
int32_t tempLen = code;
if (tempLen > 252) { // 252 is java's ((byte)-4 && 0xFF)
if (code == 0xFE) {
CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout);
DataInput lenDI(lenBytes->value(), lenBytes->length());
uint16_t val;
lenDI.readInt(&val);
tempLen = val;
} else if (code == 0xFD) {
CacheableBytesPtr lenBytes = readHandshakeData(4, connectTimeout);
DataInput lenDI(lenBytes->value(), lenBytes->length());
uint32_t val;
lenDI.readInt(&val);
tempLen = val;
} else {
GF_SAFE_DELETE(m_conn);
throwException(IllegalStateException("unexpected array length code"));
}
}
arraySize = tempLen;
}*/
uint32_t arraySize = readHandshakeArraySize(connectTimeout);
return readHandshakeRawData(arraySize, connectTimeout);
}
//read a byte array
uint32_t TcrConnection::readHandshakeArraySize(uint32_t connectTimeout)
{
CacheableBytesPtr codeBytes = readHandshakeData(1, connectTimeout);
DataInput codeDI(codeBytes->value(), codeBytes->length());
uint8_t code = 0;
codeDI.read(&code);
uint32_t arraySize = 0;
if (code == 0xFF) {
return 0;
} else {
int32_t tempLen = code;
if (tempLen > 252) { // 252 is java's ((byte)-4 && 0xFF)
if (code == 0xFE) {
CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout);
DataInput lenDI(lenBytes->value(), lenBytes->length());
uint16_t val;
lenDI.readInt(&val);
tempLen = val;
} else if (code == 0xFD) {
CacheableBytesPtr lenBytes = readHandshakeData(4, connectTimeout);
DataInput lenDI(lenBytes->value(), lenBytes->length());
uint32_t val;
lenDI.readInt(&val);
tempLen = val;
} else {
GF_SAFE_DELETE_CON(m_conn);
throwException(IllegalStateException("unexpected array length code"));
}
}
arraySize = tempLen;
}
return arraySize;
}
void TcrConnection::readHandshakeInstantiatorMsg(uint32_t connectTimeout)
{
int hashMapSize = readHandshakeArraySize(connectTimeout);
for(int i = 0; i < hashMapSize; i++)
{
readHandShakeBytes(6, connectTimeout);//reading integer and arraylist type
int aLen = readHandshakeArraySize(connectTimeout);
for(int j = 0; j < aLen; j++)
{
readHandshakeString(connectTimeout);
}
}
hashMapSize = readHandshakeArraySize(connectTimeout);
for(int i = 0; i < hashMapSize; i++)
{
readHandShakeBytes(5, connectTimeout);//reading integer
readHandshakeString(connectTimeout);
}
//added in 3.6 and 6.6
int hashMapSize2 = readHandshakeArraySize(connectTimeout);
for(int i = 0; i < hashMapSize2; i++)
{
readHandShakeBytes(6, connectTimeout);//reading integer and arraylist type
int aLen = readHandshakeArraySize(connectTimeout);
for(int j = 0; j < aLen; j++)
{
readHandshakeString(connectTimeout);
}
}
}
void TcrConnection::readHandShakeBytes(int numberOfBytes, uint32_t connectTimeout)
{
ConnErrType error = CONN_NOERR;
uint8_t * recvMessage;
GF_NEW(recvMessage, uint8_t[numberOfBytes]);
if ( ( error = receiveData( (char*)recvMessage, numberOfBytes, connectTimeout,
false ) ) != CONN_NOERR ) {
if ( error & CONN_TIMEOUT ) {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake"));
} else {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(GemfireIOException("TcrConnection::TcrConnection: "
"Handshake failure"));
}
}
GF_SAFE_DELETE_ARRAY(recvMessage);
}
int32_t TcrConnection::readHandShakeInt(uint32_t connectTimeout)
{
ConnErrType error = CONN_NOERR;
uint8_t * recvMessage;
GF_NEW(recvMessage, uint8_t[4]);
if ( ( error = receiveData( (char*)recvMessage, 4, connectTimeout,
false ) ) != CONN_NOERR ) {
if ( error & CONN_TIMEOUT ) {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake"));
} else {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
throwException(GemfireIOException("TcrConnection::TcrConnection: "
"Handshake failure"));
}
}
DataInput di(recvMessage, 4);
int32_t val;
di.readInt(&val);
GF_SAFE_DELETE_ARRAY(recvMessage);
return val;
}
CacheableStringPtr TcrConnection::readHandshakeString(uint32_t connectTimeout)
{
ConnErrType error = CONN_NOERR ;
char cstypeid;
if (receiveData(&cstypeid, 1, connectTimeout, false) != CONN_NOERR) {
GF_SAFE_DELETE_CON( m_conn );
if ( error & CONN_TIMEOUT ) {
LOGFINE("Timeout receiving string typeid");
throwException(TimeoutException( "TcrConnection::TcrConnection: "
"Timeout in handshake reading string type ID"));
} else {
LOGFINE("IO error receiving string typeid");
throwException(GemfireIOException( "TcrConnection::TcrConnection: "
"Handshake failure reading string type ID"));
}
}
LOGDEBUG("Received string typeid as %d", cstypeid);
uint32_t length = 0;
switch ((int8_t)cstypeid)
{
case GemfireTypeIds::CacheableNullString: {
return NULLPTR;
break;
}
case GF_STRING: {
uint16_t shortLen = 0;
CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout);
DataInput lenDI(lenBytes->value(), lenBytes->length());
lenDI.readInt(&shortLen);
length = shortLen;
break;
}
default: {
GF_SAFE_DELETE_CON(m_conn);
throwException(GemfireIOException("TcrConnection::TcrConnection: "
"Handshake failure: Unexpected string type ID"));
}
}
LOGDEBUG(" Received string len %d", length);
if (length == 0) {
return NULLPTR;
}
char* recvMessage;
GF_NEW(recvMessage, char[length + 1]);
recvMessage[length] = '\0';
if ( ( error = receiveData( recvMessage, length, connectTimeout,
false ) ) != CONN_NOERR ) {
if ( error & CONN_TIMEOUT ) {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
LOGFINE("Timeout receiving string data");
throwException(TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake reading string bytes"));
} else {
GF_SAFE_DELETE_ARRAY(recvMessage);
GF_SAFE_DELETE_CON( m_conn );
LOGFINE("IO error receiving string data");
throwException(GemfireIOException("TcrConnection::TcrConnection: "
"Handshake failure reading string bytes"));
}
// not expected to be reached
return NULLPTR;
} else {
LOGDEBUG(" Received string data [%s]", recvMessage);
CacheableStringPtr retval = CacheableString::createNoCopy(
recvMessage, length);
return retval;
}
}
bool TcrConnection::hasExpired(int expiryTime)
{
if (expiryTime == -1) {
return false;
}
ACE_Time_Value _expiryTime(expiryTime/1000, (expiryTime%1000)*1000);
if ( ACE_OS::gettimeofday() - m_creationTime > _expiryTime ) {
return true;
}
else {
return false;
}
}
bool TcrConnection::isIdle(int idleTime)
{
if (idleTime == -1) {
return false;
}
ACE_Time_Value _idleTime(idleTime/1000, (idleTime%1000)*1000);
if ( ACE_OS::gettimeofday() - m_lastAccessed > _idleTime ) {
return true;
}
else {
return false;
}
}
void TcrConnection::touch()
{
m_lastAccessed = ACE_OS::gettimeofday();
}
ACE_Time_Value TcrConnection::getLastAccessed()
{
return m_lastAccessed;
}
uint8_t TcrConnection::getOverrides(SystemProperties * props)
{
const char * conflate = props->conflateEvents();
uint8_t conflateByte = 0;
if (conflate != NULL ) {
if ( ACE_OS::strcasecmp(conflate, "true") == 0 ) {
conflateByte = 1;
} else if ( ACE_OS::strcasecmp(conflate, "false") == 0 ) {
conflateByte = 2;
}
}
/*
const char * removeUnresponsive = props->removeUnresponsiveClientOverride();
uint8_t removeByte = 0;
if (removeUnresponsive != NULL ) {
if ( ACE_OS::strcasecmp(removeUnresponsive, "true") == 0 ) {
removeByte = 1;
} else if ( ACE_OS::strcasecmp(removeUnresponsive, "false") == 0 ) {
removeByte = 2;
}
}
const char * notify = props->notifyBySubscriptionOverride();
uint8_t notifyByte = 0;
if (notify != NULL ) {
if ( ACE_OS::strcasecmp(notify, "true") == 0 ) {
notifyByte = 1;
} else if ( ACE_OS::strcasecmp(notify, "false") == 0 ) {
notifyByte = 2;
}
}
return (((notifyByte << 2) | removeByte) << 2) | conflateByte;
*/
return conflateByte;
}
void TcrConnection::updateCreationTime()
{
m_creationTime = ACE_OS::gettimeofday();
touch();
}
TcrConnection::~TcrConnection()
{
LOGDEBUG("Tcrconnection destructor %p . conn ref to endopint %d", this, m_endpointObj->getConnRefCounter());
m_endpointObj->addConnRefCounter(-1);
if ( m_conn != NULL ) {
LOGDEBUG("closing the connection");
m_conn->close( );
GF_SAFE_DELETE_CON( m_conn );
}
if (m_dh != NULL) {
m_dh->clearDhKeys();
GF_SAFE_DELETE( m_dh );
}
}
bool TcrConnection::setAndGetBeingUsed(volatile bool isBeingUsed, bool forTransaction)
{
uint32_t currentValue = 0;
uint32_t retVal = m_isUsed;
if(!forTransaction)
{
if(isBeingUsed)
{
if(m_isUsed == 1 || m_isUsed == 2)
return false;
retVal = HostAsm::atomicCompareAndExchange(m_isUsed, 1, currentValue);
if(retVal == currentValue)
return true;
return false;
}
else
{
m_isUsed = 0;
return true;
}
}
else
{
if(isBeingUsed)
{
if(m_isUsed == 1)//already used
return false;
if(m_isUsed == 2)//transaction thread has set, reused it
return true;
retVal = HostAsm::atomicCompareAndExchange(m_isUsed, 2/*for transaction*/, currentValue);
if(retVal == currentValue)
return true;
return false;
}
else
{
//m_isUsed = 0;//this will done by releasing the connection by transaction at the end of transaction
return true;
}
}
}