blob: 729bf82fd582c816f90bfa77294caa1f52b6b669 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#ifndef __TCR_MESSAGE_HPP__
#define __TCR_MESSAGE_HPP__
#include "../gfcpp_globals.hpp"
#include "AtomicInc.hpp"
#include "../Cacheable.hpp"
#include "../CacheableKey.hpp"
#include "../CacheableString.hpp"
#include "../UserData.hpp"
#include "../DataOutput.hpp"
#include "../DataInput.hpp"
#include "../ExceptionTypes.hpp"
#include "InterestResultPolicy.hpp"
#include "EventId.hpp"
#include "EventIdMap.hpp"
#include "../CacheableBuiltins.hpp"
#include "TcrChunkedContext.hpp"
#include "../VectorT.hpp"
#include "GemfireTypeIdsImpl.hpp"
#include "BucketServerLocation.hpp"
#include "FixedPartitionAttributesImpl.hpp"
#include "VersionTag.hpp"
#include "VersionedCacheableObjectPartList.hpp"
#include <string>
#include <map>
#include <vector>
namespace gemfire
{
class TcrMessage;
class ThinClientRegion;
class ThinClientBaseDM;
class TcrMessageHelper;
class TcrConnection;
class TcrMessage
{
private:
inline static void writeInt( uint8_t* buffer, uint16_t value );
inline static void writeInt( uint8_t* buffer,uint32_t value );
inline static void readInt(uint8_t* buffer, uint16_t *value );
inline static void readInt(uint8_t* buffer, uint32_t *value );
public:
typedef enum {
/* Server couldn't read message; handle it like a server side
exception that needs retries */
NOT_PUBLIC_API_WITH_TIMEOUT = -2,
INVALID = -1,
REQUEST = 0,
RESPONSE /* 1 */,
EXCEPTION /* 2 */,
REQUEST_DATA_ERROR /* 3 */,
DATA_NOT_FOUND_ERROR /* 4 Not in use */,
PING /* 5 */,
REPLY /* 6 */,
PUT /* 7 */,
PUT_DATA_ERROR /* 8 */,
DESTROY /* 9 */,
DESTROY_DATA_ERROR /* 10 */,
DESTROY_REGION /* 11 */,
DESTROY_REGION_DATA_ERROR /* 12 */,
CLIENT_NOTIFICATION /* 13 */,
UPDATE_CLIENT_NOTIFICATION /* 14 */,
LOCAL_INVALIDATE /* 15 */,
LOCAL_DESTROY /* 16 */,
LOCAL_DESTROY_REGION /* 17 */,
CLOSE_CONNECTION /* 18 */,
PROCESS_BATCH /* 19 */,
REGISTER_INTEREST /* 20 */,
REGISTER_INTEREST_DATA_ERROR /* 21 */,
UNREGISTER_INTEREST /* 22 */,
UNREGISTER_INTEREST_DATA_ERROR /* 23 */,
REGISTER_INTEREST_LIST /* 24 */,
UNREGISTER_INTEREST_LIST /* 25 */,
UNKNOWN_MESSAGE_TYPE_ERROR /* 26 */,
LOCAL_CREATE /* 27 */,
LOCAL_UPDATE /* 28 */,
CREATE_REGION /* 29 */,
CREATE_REGION_DATA_ERROR /* 30 */,
MAKE_PRIMARY /* 31 */,
RESPONSE_FROM_PRIMARY /* 32 */,
RESPONSE_FROM_SECONDARY /* 33 */,
QUERY /* 34 */,
QUERY_DATA_ERROR /* 35 */,
CLEAR_REGION /* 36 */,
CLEAR_REGION_DATA_ERROR /* 37 */,
CONTAINS_KEY /* 38 */,
CONTAINS_KEY_DATA_ERROR /* 39 */,
KEY_SET /* 40 */,
KEY_SET_DATA_ERROR /* 41 */,
EXECUTECQ_MSG_TYPE /* 42 */,
EXECUTECQ_WITH_IR_MSG_TYPE /*43 */,
STOPCQ_MSG_TYPE /*44*/,
CLOSECQ_MSG_TYPE /*45 */,
CLOSECLIENTCQS_MSG_TYPE /*46*/,
CQDATAERROR_MSG_TYPE /*47 */,
GETCQSTATS_MSG_TYPE /*48 */,
MONITORCQ_MSG_TYPE /*49 */,
CQ_EXCEPTION_TYPE /*50 */,
REGISTER_INSTANTIATORS = 51 /* 51 */,
PERIODIC_ACK = 52 /* 52 */,
CLIENT_READY /* 53 */,
CLIENT_MARKER /* 54 */,
INVALIDATE_REGION /* 55 */,
PUTALL /* 56 */,
GET_ALL_DATA_ERROR = 58 /* 58 */,
EXECUTE_REGION_FUNCTION = 59 /* 59 */,
EXECUTE_REGION_FUNCTION_RESULT /* 60 */,
EXECUTE_REGION_FUNCTION_ERROR /* 61 */,
EXECUTE_FUNCTION /* 62 */,
EXECUTE_FUNCTION_RESULT /* 63 */,
EXECUTE_FUNCTION_ERROR /* 64 */,
CLIENT_REGISTER_INTEREST = 65 /* 65 */,
CLIENT_UNREGISTER_INTEREST = 66,
REGISTER_DATASERIALIZERS = 67,
REQUEST_EVENT_VALUE = 68,
REQUEST_EVENT_VALUE_ERROR = 69, /*69*/
PUT_DELTA_ERROR = 70, /*70*/
GET_CLIENT_PR_METADATA = 71, /*71*/
RESPONSE_CLIENT_PR_METADATA = 72, /*72*/
GET_CLIENT_PARTITION_ATTRIBUTES = 73, /*73*/
RESPONSE_CLIENT_PARTITION_ATTRIBUTES =74, /*74*/
GET_CLIENT_PR_METADATA_ERROR = 75, /*75*/
GET_CLIENT_PARTITION_ATTRIBUTES_ERROR = 76, /*76*/
USER_CREDENTIAL_MESSAGE = 77,
REMOVE_USER_AUTH = 78,
EXECUTE_REGION_FUNCTION_SINGLE_HOP = 79,
QUERY_WITH_PARAMETERS = 80,
SIZE = 81,
SIZE_ERROR = 82,
INVALIDATE = 83,
INVALIDATE_ERROR = 84,
COMMIT = 85,
COMMIT_ERROR = 86,
ROLLBACK = 87,
TX_FAILOVER = 88,
GET_ENTRY = 89,
TX_SYNCHRONIZATION = 90,
GET_FUNCTION_ATTRIBUTES = 91,
GET_PDX_TYPE_BY_ID = 92,
GET_PDX_ID_FOR_TYPE = 93,
ADD_PDX_TYPE = 94,
ADD_PDX_ENUM = 96,
GET_PDX_ID_FOR_ENUM = 97,
GET_PDX_ENUM_BY_ID = 98,
SERVER_TO_CLIENT_PING = 99,
//GATEWAY_RECEIVER_COMMAND = 99,
GET_ALL_70 = 100,
TOMBSTONE_OPERATION = 103,
GETDURABLECQS_MSG_TYPE = 105,
GET_DURABLE_CQS_DATA_ERROR = 106,
GET_ALL_WITH_CALLBACK = 107,
PUT_ALL_WITH_CALLBACK = 108,
REMOVE_ALL = 109
} MsgType;
static bool init();
static void cleanup();
static bool isKeepAlive()
{
return *m_keepalive;
}
static bool isUserInitiativeOps(const TcrMessage& msg)
{
int32_t msgType = msg.getMessageType();
if(!msg.isMetaRegion() && !(msgType == TcrMessage::PING
|| msgType == TcrMessage::PERIODIC_ACK
|| msgType == TcrMessage::MAKE_PRIMARY
|| msgType == TcrMessage::CLOSE_CONNECTION
|| msgType == TcrMessage::CLIENT_READY
|| msgType == TcrMessage::INVALID
|| msgType == TcrMessage::MONITORCQ_MSG_TYPE
|| msgType == TcrMessage::GETCQSTATS_MSG_TYPE
|| msgType == TcrMessage::REQUEST_EVENT_VALUE
|| msgType == TcrMessage::GET_CLIENT_PR_METADATA
|| msgType == TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES
|| msgType == TcrMessage::GET_PDX_ID_FOR_TYPE
|| msgType == TcrMessage::GET_PDX_TYPE_BY_ID
|| msgType == TcrMessage::ADD_PDX_TYPE
|| msgType == TcrMessage::SIZE
|| msgType == TcrMessage::TX_FAILOVER
|| msgType == TcrMessage::GET_ENTRY
|| msgType == TcrMessage::TX_SYNCHRONIZATION
|| msgType == TcrMessage::GET_FUNCTION_ATTRIBUTES
|| msgType == TcrMessage::ADD_PDX_ENUM
|| msgType == TcrMessage::GET_PDX_ENUM_BY_ID
|| msgType == TcrMessage::GET_PDX_ID_FOR_ENUM
|| msgType == TcrMessage::COMMIT
|| msgType == TcrMessage::ROLLBACK
)) {
return true;
}
return false;
}
static VersionTagPtr readVersionTagPart(DataInput& input, uint16_t endpointMemId);
/* constructors */
TcrMessage(TcrMessage::MsgType msgType, const Region* region, const UserDataPtr& aCallbackArgument, int messageResponsetimeout = -1, ThinClientBaseDM *connectionDM = NULL);
TcrMessage(TcrMessage::MsgType msgType, const std::string & regionName, int messageResponsetimeout, ThinClientBaseDM *connectionDM);
TcrMessage(TcrMessage::MsgType msgType, const std::string & regionName, const UserDataPtr& aCallbackArgument, CacheableVectorPtr paramList, int messageResponsetimeout, ThinClientBaseDM *connectionDM );
TcrMessage(TcrMessage::MsgType msgType, const Region* region, const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, ThinClientBaseDM *connectionDM = NULL);
TcrMessage(TcrMessage::MsgType msgType, const Region* region, const CacheableKeyPtr& key, const CacheablePtr& value, const UserDataPtr& aCallbackArgument,
bool isDelta = false, ThinClientBaseDM *connectionDM = NULL, bool isMetaRegion = false, bool fullValueAfterDeltaFail = false, const char* regionName = NULL);
TcrMessage(bool decodeAll = true, ThinClientBaseDM *connectionDM = NULL);
TcrMessage( const EventIdMapEntryList& entries ); // for periodic notify ack
//PUTALL
TcrMessage(TcrMessage::MsgType msgType, const Region* region, const HashMapOfCacheable& map, int messageResponsetimeout = -1, ThinClientBaseDM *connectionDM = NULL,
const UserDataPtr& aCallbackArgument = NULLPTR);
//removeall
TcrMessage(TcrMessage::MsgType msgType, const Region* region, const VectorOfCacheableKey& keys, const UserDataPtr& aCallbackArgument, ThinClientBaseDM *connectionDM = NULL);
TcrMessage( const std::string& funcName,
const CacheablePtr& args,
uint8_t getResult,
ThinClientBaseDM *connectionDM, int32_t timeout );
TcrMessage(TcrMessage::MsgType msgType,
const std::string & funcName,
const Region* region,
const CacheablePtr& args,
CacheableVectorPtr routingObj,
uint8_t getResult,
CacheableHashSetPtr failedNodes, int32_t timeout,
ThinClientBaseDM *connectionDM = NULL, int8_t reExecute = 0);
// EXECUTE_REGION_FUNCTION_SINGLE_HOP
TcrMessage(TcrMessage::MsgType msgType,
const std::string & funcName,
const Region* region,
const CacheablePtr& args,
CacheableHashSetPtr routingObj,
uint8_t getResult,
CacheableHashSetPtr failedNodes, bool allBuckets, int32_t timeout,
ThinClientBaseDM *connectionDM = NULL);
TcrMessage( EventIdPtr eventId );
TcrMessage(TcrMessage::MsgType msgType, PropertiesPtr creds, ThinClientBaseDM *connectionDM = NULL);
TcrMessage(TcrMessage::MsgType msgType, ThinClientBaseDM *connectionDM);
TcrMessage(TcrMessage::MsgType msgType, int ordinal, int txid, int status);
void setData(const char *bytearray, int32_t len, uint16_t memId);
void startProcessChunk(ACE_Semaphore& finalizeSema);
// NULL chunk means that this is the last chunk
void processChunk(const uint8_t* chunk, int32_t chunkLen, uint16_t endpointmemId, const uint8_t isLastChunkAndisSecurityHeader = 0x00);
/* for RESPONSE of REGISTER_INTEREST_LIST */
/* constructor for creating REGISTER_INTEREST_LIST */
TcrMessage(TcrMessage::MsgType msgType, const Region* region,
const VectorOfCacheableKey& keys, bool isDurable = false, bool isCachingEnabled = false, bool receiveValues = true, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, ThinClientBaseDM *connectionDM = NULL);
/* For creating a region on the java server */
/* Note through this you can only create a sub region on the cache server */
/* also for creating REGISTER_INTEREST regex request */
TcrMessage( TcrMessage::MsgType msgType, const std::string& str1,
const std::string& str2, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, bool isDurable = false, bool isCachingEnabled = false, bool receiveValues = true, ThinClientBaseDM *connectionDM = NULL);
// constructor for CLIENT_READY message.
TcrMessage(TcrMessage::MsgType msgType);
// constructor for MAKE_PRIMARY message.
TcrMessage(TcrMessage::MsgType msgType, bool processedMarker);
// Constructor for UPDATE_CLIENT_NOTIFICATION message
TcrMessage( int32_t port );
// constructor for GET_ALL message
TcrMessage(const Region* region,
const VectorOfCacheableKey* keys, ThinClientBaseDM *connectionDM = NULL, const UserDataPtr& aCallbackArgument = NULLPTR);
void InitializeGetallMsg(const UserDataPtr& aCallbackArgument);
/* constructor for cq execute */
TcrMessage( TcrMessage::MsgType msgType, const std::string& str1,
const std::string& str2, int state, bool isDurable, ThinClientBaseDM *connectionDM = NULL);
//for multiuser cache close
TcrMessage(TcrMessage::MsgType msgType, bool keepAlive, ThinClientBaseDM *connectionDM );
// Constructor for GetClientPRMetaDataOp & GetClientPartitionAttributesOp message types for PR Single Hop.
TcrMessage( TcrMessage::MsgType msgType, const char* regionName );
// Constructor for new Remove operation, that takes key, value & callbackarg.
TcrMessage(TcrMessage::MsgType msgType, const Region* region, const CacheableKeyPtr& key,
const CacheablePtr& value, const UserDataPtr& aCallbackArgument, ThinClientBaseDM *connectionDM = NULL);
//constructor for GetPDXIdForType
TcrMessage( TcrMessage::MsgType msgType, const CacheablePtr& pdxType, ThinClientBaseDM *connectionDM , int32_t pdxTypeId = 0);
//constructor for GetPDXTypeById
TcrMessage( TcrMessage::MsgType msgType, int32_t typeId, ThinClientBaseDM *connectionDM );
TcrMessage(TcrMessage::MsgType msgType, const Region* region, const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, bool isContainsKey, ThinClientBaseDM *connectionDM);
// constructor for GET_FUNCTION_ATTRIBUTES and KEY_SET
TcrMessage(TcrMessage::MsgType msgType, const std::string & funcName, ThinClientBaseDM *connectionDM = NULL);
// Updates the early ack byte of the message to reflect that it is a retry op
void updateHeaderForRetry();
inline const VectorOfCacheableKey* getKeys() const
{
return m_keyList;
}
inline const std::string& getRegex( ) const
{
return m_regex;
}
inline InterestResultPolicy getInterestResultPolicy( ) const
{
if(m_interestPolicy == 2)
return InterestResultPolicy::KEYS_VALUES;
else if(m_interestPolicy == 1)
return InterestResultPolicy::KEYS;
else
return InterestResultPolicy::NONE;
}
const char * getPoolName();
/**
* Whether the request is meant to be
* sent to PR primary node for single hop.
*/
inline bool forPrimary() const
{
return
m_msgType == TcrMessage::PUT ||
m_msgType == TcrMessage::DESTROY ||
m_msgType == TcrMessage::EXECUTE_REGION_FUNCTION;
}
inline void initCqMap() {
m_cqs = new std::map<std::string, int>();
}
inline bool forSingleHop() const
{
return
m_msgType == TcrMessage::PUT ||
m_msgType == TcrMessage::DESTROY ||
m_msgType == TcrMessage::REQUEST ||
m_msgType == TcrMessage::GET_ALL_70||
m_msgType == TcrMessage::GET_ALL_WITH_CALLBACK ||
m_msgType == TcrMessage::EXECUTE_REGION_FUNCTION ||
m_msgType == TcrMessage::PUTALL ||
m_msgType == TcrMessage::PUT_ALL_WITH_CALLBACK;
}
inline bool forTransaction() const
{
return m_txId != -1;
}
/*
inline void getSingleHopFlags(bool& forSingleHop, bool& forPrimary) const
{
if (m_msgType == TcrMessage::PUT ||
m_msgType == TcrMessage::DESTROY ||
m_msgType == TcrMessage::REQUEST) {
forSingleHop = true;
if (m_msgType == TcrMessage::REQUEST) {
forPrimary = false;
} else {
forPrimary = true;
}
} else {
forSingleHop = false;
forPrimary = false;
}
}
*/
/* destroy the connection */
~TcrMessage();
const std::string & getRegionName() const;
Region * getRegion() const;
int32_t getMessageType() const;
void setMessageType(int32_t msgType);
void setMessageTypeRequest(int32_t msgType); // the msgType of the request that was made if this is a reply
int32_t getMessageTypeRequest() const;
CacheableKeyPtr getKey() const;
const CacheableKeyPtr& getKeyRef() const;
CacheablePtr getValue() const;
const CacheablePtr& getValueRef() const;
CacheablePtr getCallbackArgument() const;
const CacheablePtr& getCallbackArgumentRef() const;
const std::map<std::string, int>* getCqs() const;
bool getBoolValue() const {return m_boolValue;};
inline const char* getException( )
{
exceptionMessage = (m_value == NULLPTR ? CacheableString::create("(null)") : m_value->toString());
return exceptionMessage->asChar();
}
const char *getMsgData() const;
const char *getMsgHeader() const;
const char *getMsgBody() const;
uint32_t getMsgLength() const;
uint32_t getMsgBodyLength() const;
EventIdPtr getEventId() const;
int32_t getTransId() const;
void setTransId( int32_t txId );
uint32_t getTimeout() const;
void setTimeout(uint32_t timeout);
/* we need a static method to generate ping */
/* The caller should not delete the message since it is global. */
static TcrMessage* getPingMessage();
static TcrMessage * getAllEPDisMess();
/* we need a static method to generate close connection message */
/* The caller should not delete the message since it is global. */
static TcrMessage* getCloseConnMessage();
static void setKeepAlive(bool keepalive);
bool isDurable( ) const { return m_isDurable; }
bool receiveValues( ) const { return m_receiveValues; }
bool hasCqPart() const {return m_hasCqsPart;}
uint32_t getMessageTypeForCq() const { return m_msgTypeForCq; }
bool isInterestListPassed() const {return m_isInterestListPassed;}
bool shouldIgnore() const { return m_shouldIgnore; }
int8 getMetaDataVersion() const { return m_metaDataVersion; }
uint32_t getEntryNotFound() const { return m_entryNotFound; }
int8 getserverGroupVersion() const { return m_serverGroupVersion; }
std::vector<int8>* getFunctionAttributes() {
return m_functionAttributes; }
// set the DM for chunked response messages
void setDM(ThinClientBaseDM* dm)
{
m_tcdm = dm;
}
ThinClientBaseDM* getDM()
{
return m_tcdm;
}
// set the chunked response handler
void setChunkedResultHandler( TcrChunkedResult* chunkedResult)
{
this->m_isLastChunkAndisSecurityHeader = 0x0;
m_chunkedResult = chunkedResult;
}
TcrChunkedResult* getChunkedResultHandler()
{
return m_chunkedResult;
}
void setVersionedObjectPartList(VersionedCacheableObjectPartListPtr versionObjPartListptr){
m_versionObjPartListptr = versionObjPartListptr;
}
VersionedCacheableObjectPartListPtr getVersionedObjectPartList(){
return m_versionObjPartListptr;
}
DataInput* getDelta( ) { return m_delta; }
// ARB: getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes that
// takes ownership of delta bytes.
CacheableBytesPtr getDeltaBytes( ) {
if ( m_deltaBytes == NULL ) {
return NULLPTR;
}
CacheableBytesPtr retVal( CacheableBytes::createNoCopy( m_deltaBytes, m_deltaBytesLen ) );
m_deltaBytes = NULL;
return retVal;
}
bool hasDelta( ) { return ( m_delta != NULL ); }
void addSecurityPart(int64_t connectionId, int64_t unique_id, TcrConnection* conn);
void addSecurityPart(int64_t connectionId, TcrConnection* conn);
int64_t getConnectionId(TcrConnection* conn);
int64_t getUniqueId(TcrConnection* conn);
void createUserCredentialMessage(TcrConnection* conn);
void readSecureObjectPart(DataInput& input, bool defaultString =false, bool isChunk = false, uint8_t isLastChunkWithSecurity = 0);
void readUniqueIDObjectPart(DataInput& input);
void setMetaRegion(bool isMetaRegion)
{
m_isMetaRegion = isMetaRegion;
}
bool isMetaRegion() const
{
return m_isMetaRegion;
}
int32_t getNumBuckets() const
{
return m_bucketCount;
}
CacheableStringPtr getColocatedWith() const
{
return m_colocatedWith;
}
CacheableStringPtr getPartitionResolver() const
{
return m_partitionResolverName;
}
std::vector<std::vector<BucketServerLocationPtr> >* getMetadata()
{
return m_metadata;
}
std::vector<FixedPartitionAttributesImplPtr>* getFpaSet()
{
return m_fpaSet;
}
CacheableHashSetPtr getFailedNode( ) {
return m_failedNode;
}
bool isCallBackArguement() const {
return m_isCallBackArguement;
}
void setCallBackArguement(bool aCallBackArguement) {
m_isCallBackArguement = aCallBackArguement;
}
void setBucketServerLocation(BucketServerLocationPtr serverLocation)
{
m_bucketServerLocation = serverLocation;
}
void setVersionTag(VersionTagPtr versionTag)
{
m_versionTag = versionTag;
}
VersionTagPtr getVersionTag()
{
return m_versionTag;
}
uint8_t hasResult() const {
return m_hasResult;
}
CacheableHashMapPtr getTombstoneVersions() const {
return m_tombstoneVersions;
}
CacheableHashSetPtr getTombstoneKeys () const {
return m_tombstoneKeys;
}
bool isFEAnotherHop();
private:
void handleSpecialFECase();
bool m_feAnotherHop;
void writeBytesOnly(const SerializablePtr& se);
SerializablePtr readCacheableBytes(DataInput& input, int lenObj);
SerializablePtr readCacheableString(DataInput& input, int lenObj);
static AtomicInc m_transactionId;
static TcrMessage* m_pingMsg;
static TcrMessage* m_closeConnMsg;
static TcrMessage* m_allEPDisconnected;
static uint8_t * m_keepalive;
const static int m_flag_empty;
const static int m_flag_concurrency_checks;
CacheableBytesPtr m_connectionIDBytes;
bool isSecurityOn;
uint8_t m_isLastChunkAndisSecurityHeader;
bool m_isSecurityHeaderAdded;
PropertiesPtr m_creds;
int32_t m_securityHeaderLength;
bool m_isMetaRegion;
CacheableStringPtr exceptionMessage;
// Disallow copy constructor and assignment operator.
TcrMessage(const TcrMessage &);
TcrMessage & operator = (const TcrMessage &);
// some private methods to handle things internally.
void setPingMessage();
void setCloseConnMessage();
void handleByteArrayResponse(const char * bytearray, int32_t len, uint16_t endpointMemId);
void readObjectPart( DataInput& input, bool defaultString = false );
void readFailedNodePart( DataInput& input, bool defaultString = false );
void readCallbackObjectPart( DataInput& input, bool defaultString = false );
void readKeyPart( DataInput& input );
void readBooleanPartAsObject( DataInput& input, bool* boolVal );
void readIntPart(DataInput& input, uint32_t* intValue );
void readLongPart(DataInput& input, uint64_t* intValue );
bool readExceptionPart( DataInput& input, uint8_t isLastChunk, bool skipFirstPart=true );
void readVersionTag(DataInput& input, uint16_t endpointMemId);
void readOldValue(DataInput& input);
void readPrMetaData(DataInput& input);
void writeObjectPart(const SerializablePtr& se, bool isDelta = false, bool callToData = false, const VectorOfCacheableKey* getAllKeyList = NULL);
void writeHeader(uint32_t msgType, uint32_t numOfParts);
void writeRegionPart(const std::string & regionName);
void writeStringPart(const std::string & regionName);
void writeEventIdPart( int reserveSize = 0, bool fullValueAfterDeltaFail = false );
void writeMessageLength();
void writeInterestResultPolicyPart(InterestResultPolicy policy);
void writeIntPart( int32_t intValue );
void writeBytePart(uint8_t byteValue);
void writeByteAndTimeOutPart(uint8_t byteValue, int32_t timeout);
void chunkSecurityHeader(int skipParts, const uint8_t* bytes, int32_t len, uint8_t isLastChunkAndSecurityHeader);
void readEventIdPart(DataInput& input, bool skip = false, int32_t parts = 1); // skip num parts then read eventid
void skipParts( DataInput& input, int32_t numParts = 1 );
void readStringPart(DataInput& input, uint32_t* len, char** str );
void readCqsPart(DataInput& input);
void readHashMapForGCVersions( gemfire::DataInput& input,
CacheableHashMapPtr& value );
void readHashSetForGCVersions( gemfire::DataInput& input,
CacheableHashSetPtr& value );
DSMemberForVersionStampPtr readDSMember( gemfire::DataInput& input );
DataOutput * m_request;
int32_t m_msgType;
int32_t m_msgLength;
int32_t m_msgTypeRequest; // the msgType of the request if this TcrMessage is a reply msg
int32_t m_txId;
bool m_decodeAll; // used only when decoding reply message, if false, decode header only
// the associated region that is handling processing of chunked responses
ThinClientBaseDM* m_tcdm;
TcrChunkedResult* m_chunkedResult;
const VectorOfCacheableKey* m_keyList;
CacheableKeyPtr m_key;
CacheablePtr m_value;
CacheableHashSetPtr m_failedNode;
CacheablePtr m_callbackArgument;
VersionTagPtr m_versionTag;
EventIdPtr m_eventid;
std::string m_regionName;
const Region* m_region;
std::string m_regex;
char m_interestPolicy;
uint32_t m_timeout;
bool m_isDurable;
bool m_receiveValues;
bool m_hasCqsPart;
bool m_isInterestListPassed;
bool m_shouldIgnore;
int8 m_metaDataVersion;
int8 m_serverGroupVersion;
std::vector<BucketServerLocationPtr> m_bucketServerLocations;
std::vector<std::vector<BucketServerLocationPtr> >* m_metadata;
int32_t m_bucketCount;
CacheableStringPtr m_colocatedWith;
CacheableStringPtr m_partitionResolverName;
CacheableVectorPtr m_vectorPtr;
uint32_t m_numCqPart;
uint32_t m_msgTypeForCq; // new part since 7.0 for cq event message type.
std::map<std::string, int>* m_cqs;
int32_t m_messageResponseTimeout;
bool m_boolValue;
DataInput* m_delta;
uint8_t* m_deltaBytes;
int32_t m_deltaBytesLen;
bool m_isCallBackArguement;
BucketServerLocationPtr m_bucketServerLocation;
uint32_t m_entryNotFound;
std::vector<FixedPartitionAttributesImplPtr>* m_fpaSet;
std::vector<int8>* m_functionAttributes;
uint8_t m_hasResult;
CacheableHashMapPtr m_tombstoneVersions;
CacheableHashSetPtr m_tombstoneKeys;
/*Added this member for PutALL versioning changes*/
VersionedCacheableObjectPartListPtr m_versionObjPartListptr;
friend class TcrMessageHelper;
};
/**
* Helper class to invoke some internal methods of TcrMessage. Add any
* methods that response processor methods require to access here.
*/
class TcrMessageHelper
{
public:
/**
* result types returned by readChunkPartHeader
*/
enum ChunkObjectType
{
OBJECT = 0,
EXCEPTION = 1,
NULL_OBJECT = 2
};
/**
* Tries to read an exception part and returns true if the exception
* was successfully read.
*/
inline static bool readExceptionPart(TcrMessage& msg, DataInput& input, uint8_t isLastChunk)
{
return msg.readExceptionPart(input, isLastChunk);
}
inline static void skipParts(TcrMessage& msg, DataInput& input, int32_t numParts = 1)
{
msg.skipParts(input, numParts);
}
/**
* Reads header of a chunk part. Returns true if header was successfully
* read and false if it is a chunk exception part.
* Throws a MessageException with relevant message if an unknown
* message type is encountered in the header.
*/
inline static ChunkObjectType readChunkPartHeader(TcrMessage& msg,
DataInput& input, uint8_t expectedFirstType, int32_t expectedPartType,
const char* methodName, uint32_t& partLen, uint8_t isLastChunk)
{
input.readInt(&partLen);
bool isObj;
input.readBoolean(&isObj);
if (partLen == 0) {
// special null object is case for scalar query result
return NULL_OBJECT;
}
else if (!isObj) {
// otherwise we're currently always expecting an object
char exMsg[256];
ACE_OS::snprintf(exMsg, 255, "TcrMessageHelper::readChunkPartHeader: "
"%s: part is not object", methodName);
LOGDEBUG("%s ", exMsg);
//throw MessageException(exMsg);
return EXCEPTION;
}
uint8_t partType;
input.read(&partType);
int32_t compId = partType;
// [sumedh] ugly hack to check for exception chunk
if (partType == GemfireTypeIdsImpl::JavaSerializable) {
input.reset();
if (TcrMessageHelper::readExceptionPart(msg, input, isLastChunk )) {
msg.setMessageType(TcrMessage::EXCEPTION);
return EXCEPTION;
}
else {
char exMsg[256];
ACE_OS::snprintf(exMsg, 255,
"TcrMessageHelper::readChunkPartHeader: %s: cannot handle "
"java serializable object from server", methodName);
throw MessageException(exMsg);
}
}
else if (partType == GemfireTypeIds::NullObj) {
// special null object is case for scalar query result
return NULL_OBJECT;
}
if (expectedFirstType > 0) {
if (partType != expectedFirstType) {
char exMsg[256];
ACE_OS::snprintf(exMsg, 255, "TcrMessageHelper::readChunkPartHeader: "
"%s: got unhandled object class = %d", methodName, partType);
throw MessageException(exMsg);
}
//This is for GETALL
if (expectedFirstType == GemfireTypeIdsImpl::FixedIDShort) {
int16_t shortId;
input.readInt(&shortId);
compId = shortId;
} // This is for QUERY or REGISTER INTEREST.
else if (expectedFirstType == GemfireTypeIdsImpl::FixedIDByte || expectedFirstType == 0) {
input.read(&partType);
compId = partType;
}
}
if (compId != expectedPartType) {
char exMsg[256];
ACE_OS::snprintf(exMsg, 255, "TcrMessageHelper::readChunkPartHeader: "
"%s: got unhandled object type = %d", methodName, compId);
throw MessageException(exMsg);
}
return OBJECT;
}
inline static int8_t readChunkPartHeader(TcrMessage& msg,
DataInput& input, const char* methodName, uint32_t& partLen, uint8_t isLastChunk)
{
input.readInt(&partLen);
bool isObj;
input.readBoolean(&isObj);
if (partLen == 0) {
// special null object is case for scalar query result
return (int8_t)NULL_OBJECT;
}
else if (!isObj) {
// otherwise we're currently always expecting an object
char exMsg[256];
ACE_OS::snprintf(exMsg, 255, "TcrMessageHelper::readChunkPartHeader: "
"%s: part is not object", methodName);
throw MessageException(exMsg);
}
int8_t partType;
input.read(&partType);
// [sumedh] ugly hack to check for exception chunk
if (partType == GemfireTypeIdsImpl::JavaSerializable) {
input.reset();
if (TcrMessageHelper::readExceptionPart(msg, input, isLastChunk)) {
msg.setMessageType(TcrMessage::EXCEPTION);
return (int8_t)EXCEPTION;
}
else {
char exMsg[256];
ACE_OS::snprintf(exMsg, 255,
"TcrMessageHelper::readChunkPartHeader: %s: cannot handle "
"java serializable object from server", methodName);
throw MessageException(exMsg);
}
}
else if (partType == GemfireTypeIds::NullObj) {
// special null object is case for scalar query result
return (int8_t)NULL_OBJECT;
}
return partType;
}
};
}
#endif // __TCR_MESSAGE_HPP__