blob: 6042e46cf37bebfbd89767374de87aa53d26604b [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#ifndef __TCR_CONNECTION_HPP__
#define __TCR_CONNECTION_HPP__
#include <ace/Semaphore.h>
#include "../gfcpp_globals.hpp"
#include "../ExceptionTypes.hpp"
#include "Connector.hpp"
#include "Set.hpp"
#include "TcrMessage.hpp"
#include "../CacheableBuiltins.hpp"
#include "DiffieHellman.hpp"
#define DEFAULT_TIMEOUT_RETRIES 12
#define PRIMARY_SERVER_TO_CLIENT 101
#define SECONDARY_SERVER_TO_CLIENT 102
#define SUCCESSFUL_SERVER_TO_CLIENT 105
#define UNSUCCESSFUL_SERVER_TO_CLIENT 106
#define CLIENT_TO_SERVER 100
#define REPLY_OK 59
#define REPLY_OK_CS43 58
#define REPLY_REFUSED 60
#define REPLY_INVALID 61
#define REPLY_SSL_ENABLED 21
#define REPLY_AUTHENTICATION_REQUIRED 62
#define REPLY_AUTHENTICATION_FAILED 63
#define REPLY_DUPLICATE_DURABLE_CLIENT 64
#define SECURITY_CREDENTIALS_NONE 0
#define SECURITY_CREDENTIALS_NORMAL 1
#define SECURITY_CREDENTIALS_DHENCRYPT 2
#define SECURITY_MULTIUSER_NOTIFICATIONCHANNEL 3
/** Closes and Deletes connection only if it exists */
#define GF_SAFE_DELETE_CON(x) \
{ \
x->close();\
delete x; \
x = NULL; \
}
namespace gemfire {
enum ConnErrType
{
CONN_NOERR = 0x0,
CONN_NODATA = 0x1,
CONN_TIMEOUT = 0x3,
CONN_IOERR = 0x4,
CONN_OTHERERR = 0x8
};
enum ServerQueueStatus
{
NON_REDUNDANT_SERVER = 0,
REDUNDANT_SERVER = 1,
PRIMARY_SERVER = 2
};
class TcrEndpoint;
class SystemProperties;
class ThinClientPoolDM;
class CPPCACHE_EXPORT TcrConnection
{
public:
/** Create one connection, endpoint is in format of hostname:portno
* It will do handshake with j-server. There're 2 types of handshakes:
* 1) handshake for request
* send following bytes:
* CLIENT_TO_SERVER
* REPLY_OK
* 2 bytes for the length of idenfifier
* a string with "hostname:processId" as identifier
*
* if send succeeds, handshake succeeds. Otherwise, construction
* fails.
*
* 2) handshake for client notification
* send following bytes:
* SERVER_TO_CLIENT
* 1 (4 bytes, we can hard-code)
* 12345 (4 bytes, we can hard-code)
*
* So the total bytes to send are 9
* read one byte from server, it should be CLIENT_TO_SERVER
* Otherwise, construction fails.
* @param ports List of local ports for connections to endpoint
* @param numPorts Size of ports list
*/
bool InitTcrConnection( TcrEndpoint *endpointObj, const char * endpoint, Set< uint16_t >& ports,
bool isClientNotification = false,
bool isSecondary = false,
uint32_t connectTimeout = DEFAULT_CONNECT_TIMEOUT );
TcrConnection(volatile const bool& isConnected):
m_dh(NULL), m_endpoint(NULL), m_endpointObj(NULL),
m_connected(isConnected), m_conn(NULL),
m_chunksProcessSema(0), m_isUsed(0),
/* adongre
*/
connectionId(0),m_hasServerQueue(NON_REDUNDANT_SERVER),
m_isBeingUsed (false),m_port(0),m_queueSize(0), m_poolDM(NULL)
{}
/* destroy the connection */
~TcrConnection( );
/**
* send a synchronized request to server.
*
* It will send the buffer, then wait to receive 17 bytes and save in msg_header.
* msg_header[0] is message type.
* msg_header[1],msg_header[2],msg_header[3],msg_header[4] will be a 4 bytes integer,
* let's say, msgLen, which specifies the length of next read. byteReads some number of
* call read again for msgLen bytes, and save the bytes into msg_body.
* concatenate the msg_header and msg_body into buffer, msg. The msg should be a '0' ended
* string. i.e. If the msg_header plus msg_body has 100 chars, msg should be a 101 char array
* to contain the '0' in the end. We need it to get length of the msg.
* Return the msg.
*
* @param buffer the buffer to send
* @param len length of the data to send
* @param sendTimeoutSec write timeout in sec
* @param recvLen output parameter for length of the received message
* @param receiveTimeoutSec read timeout in sec
* @return byte arrary of response. '0' ended.
* @exception GemfireIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket operation: 1 write, 2 read
*/
char * sendRequest(const char* buffer, int32_t len, size_t* recvLen,
uint32_t sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
uint32_t receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS, int32 request = -1);
/**
* send a synchronized request to server for REGISTER_INTEREST_LIST.
*
* @param buffer the buffer to send
* len length of the data to send
* message vector, which will return chunked TcrMessage.
* sendTimeoutSec write timeout in sec
* receiveTimeoutSec read timeout in sec
* @exception GemfireIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket operation: 1 write, 2 read
*/
void sendRequestForChunkedResponse( const TcrMessage& request, int32_t len,
TcrMessage& message, uint32_t sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
uint32_t receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS );
/**
* send an asynchronized request to server. No response is expected.
* we need to use it to send CLOSE_CONNECTION msg
*
* @param buffer the buffer to send
* len length of the data to send
* sendTimeoutSec write timeout in sec
* @return no return. Because it either succeeds, or throw exception.
* @exception GemfireIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket operation: 1 write, 2 read
*/
void send( const char* buffer, int len,
uint32_t sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
bool checkConnected = true );
void send( uint32_t& timeSpent, const char* buffer, int len,
uint32_t sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
bool checkConnected = true, int32_t notPublicApiWithTimeout= -2/*NOT_PUBLIC_API_WITH_TIMEOUT*/ );
/**
* This method is for receiving client notification. It will read 2 times as reading reply in sendRequest()
*
* @param recvLen output parameter for length of the received message
* @param receiveTimeoutSec read timeout in sec
* @return byte arrary of response. '0' ended.
* @exception GemfireIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket operation: 1 write, 2 read
*/
char* receive(size_t* recvLen, ConnErrType* opErr,
uint32_t receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS);
// ARB: readMessage is now public
/**
* This method reads a message from the socket connection and returns the byte array of response.
* @param recvLen output parameter for length of the received message
* @param receiveTimeoutSec read timeout in seconds
* @param doHeaderTimeoutRetries retry when header receive times out
* @return byte array of response. '0' ended.
* @exception GemfireIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens during read
*/
char* readMessage(size_t* recvLen, uint32_t receiveTimeoutSec,
bool doHeaderTimeoutRetries, ConnErrType* opErr, bool isNotificationMessage = false, int32 request = -1 );
/**
* This method reads an interest list response message from the socket connection and sets the reply message
* parameter.
* @param reply response message
* @param receiveTimeoutSec read timeout in sec
* @param doHeaderTimeoutRetries retry when header receive times out
* @exception GemfireIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens during read
*/
void readMessageChunked( TcrMessage &reply, uint32_t receiveTimeoutSec,
bool doHeaderTimeoutRetries );
/**
* Send close connection message to the server.
*/
void close( );
// ARB: Durable clients: return true if server has HA queue.
ServerQueueStatus inline getServerQueueStatus(int32_t& queueSize) {
queueSize = m_queueSize;
return m_hasServerQueue;
}
uint16 inline getPort( ) { return m_port; }
TcrEndpoint* getEndpointObject()const { return m_endpointObj; }
bool isBeingUsed() { return m_isBeingUsed; }
bool setAndGetBeingUsed(volatile bool isBeingUsed, bool forTransaction);// { m_isBeingUsed = isBeingUsed ;}
// helpers for pool connection manager
void touch();
bool hasExpired(int expiryTime);
bool isIdle(int idleTime);
ACE_Time_Value getLastAccessed();
void updateCreationTime();
int64_t getConnectionId()
{
LOGDEBUG("TcrConnection::getConnectionId() = %d ",connectionId);
return connectionId;
}
void setConnectionId(int64_t id)
{
LOGDEBUG("Tcrconnection:setConnectionId() = %d ", id);
connectionId = id;
}
CacheableBytesPtr encryptBytes(CacheableBytesPtr data)
{
if(m_dh != NULL)
return m_dh->encrypt(data);
else
return data;
}
CacheableBytesPtr decryptBytes(CacheableBytesPtr data)
{
if(m_dh != NULL)
return m_dh->decrypt(data);
else
return data;
}
private:
int64_t connectionId;
DiffieHellman *m_dh;
/**
* To read Intantiator message(which meant for java client), here we are ignoring it
*/
void readHandshakeInstantiatorMsg(uint32_t connectTimeout);
/**
* Packs the override settings bits into bytes - currently a single byte for
* conflation, remove-unresponsive-client and notify-by-subscription.
*/
uint8_t getOverrides(SystemProperties * props);
/**
* To read the from stream
*/
int32_t readHandShakeInt(uint32_t connectTimeout);
/*
* To read the arraysize
*/
uint32_t readHandshakeArraySize(uint32_t connectTimeout);
/*
* This function reads "numberOfBytes" and ignores it.
*/
void readHandShakeBytes(int numberOfBytes, uint32_t connectTimeout);
/** Create a normal or SSL connection */
Connector * createConnection(const char * ipaddr, uint32_t waitSeconds =
DEFAULT_CONNECT_TIMEOUT, int32_t maxBuffSizePool = 0);
/**
* Reads bytes from socket and handles error conditions in case of Handshake.
*/
/* adongre
* CID 28738: Unsigned compared against 0 (NO_EFFECT)
* This less-than-zero comparison of an unsigned value is never true. "msgLength < 0U".
*/
CacheableBytesPtr readHandshakeData( int32_t msgLength,
uint32_t connectTimeout );
/**
* Reads raw bytes (without appending NULL terminator) from socket and handles error conditions in case of Handshake.
*/
/* adongre
* CID 28739: Unsigned compared against 0 (NO_EFFECT)
* change the input parameter from unint32_t to int32_t
* as the comparasion case is valid
*/
//CacheableBytesPtr readHandshakeRawData( uint32_t msgLength, uint32_t connectTimeout );
CacheableBytesPtr readHandshakeRawData( int32_t msgLength, uint32_t connectTimeout );
/**
* Reads a string from socket and handles error conditions in case of Handshake.
*/
CacheableStringPtr readHandshakeString(uint32_t connectTimeout);
/**
* Reads a byte array (using initial length) from socket and handles error conditions in case of Handshake.
*/
CacheableBytesPtr readHandshakeByteArray(uint32_t connectTimeout);
/**
* Send data to the connection till sendTimeoutSec
*/
ConnErrType sendData( const char* buffer, int32_t length,
uint32_t sendTimeoutSec, bool checkConnected = true );
ConnErrType sendData( uint32_t& timeSpent,const char* buffer, int32_t length,
uint32_t sendTimeoutSec, bool checkConnected = true, int32_t notPublicApiWithTimeout = -2/*NOT_PUBLIC_API_WITH_TIMEOUT*/ );
/**
* Read data from the connection till receiveTimeoutSec
*/
ConnErrType receiveData( char* buffer, int32_t length,
uint32_t receiveTimeoutSec, bool checkConnected = true, bool isNotificationMessage = false, int32_t notPublicApiWithTimeout= -2/*NOT_PUBLIC_API_WITH_TIMEOUT*/ );
const char * m_endpoint;
TcrEndpoint *m_endpointObj;
volatile const bool& m_connected;
Connector *m_conn;
ServerQueueStatus m_hasServerQueue;
int32_t m_queueSize;
uint16 m_port;
// semaphore to synchronize with the chunked response processing thread
ACE_Semaphore m_chunksProcessSema;
ACE_Time_Value m_creationTime;
ACE_Time_Value m_lastAccessed;
// Disallow copy constructor and assignment operator.
TcrConnection(const TcrConnection &);
TcrConnection & operator = (const TcrConnection &);
volatile bool m_isBeingUsed;
volatile uint32_t m_isUsed;
ThinClientPoolDM * m_poolDM;
};
}
#endif // __TCR_CONNECTION_HPP__