blob: 71fd6d7540dab70bf072c4ead1936abe44d0e2f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TcrMessage.hpp"
#include <gfcpp/Assert.hpp>
#include <gfcpp/CacheableBuiltins.hpp>
#include <gfcpp/DistributedSystem.hpp>
#include <gfcpp/SystemProperties.hpp>
#include "TcrConnection.hpp"
#include "AutoDelete.hpp"
#include "TcrChunkedContext.hpp"
#include <gfcpp/CacheableObjectArray.hpp>
#include "ThinClientRegion.hpp"
#include "ThinClientBaseDM.hpp"
#include "StackTrace.hpp"
#include "TcrConnection.hpp"
#include "ThinClientPoolDM.hpp"
#include "TSSTXStateWrapper.hpp"
#include "TXState.hpp"
#include "DiskStoreId.hpp"
#include "DiskVersionTag.hpp"
using namespace apache::geode::client;
static const uint32_t REGULAR_EXPRESSION =
1; // come from Java InterestType.REGULAR_EXPRESSION
namespace {
uint32_t g_headerLen = 17;
} // namespace
// AtomicInc TcrMessage::m_transactionId = 0;
TcrMessagePing* TcrMessage::m_pingMsg = NULL;
TcrMessage* TcrMessage::m_closeConnMsg = NULL;
TcrMessage* TcrMessage::m_allEPDisconnected = NULL;
uint8_t* TcrMessage::m_keepalive = NULL;
const int TcrMessage::m_flag_empty = 0x01;
const int TcrMessage::m_flag_concurrency_checks = 0x02;
bool TcrMessage::init() {
bool ret = true;
if (m_pingMsg == NULL) {
try {
m_pingMsg = new TcrMessagePing(true);
m_closeConnMsg = new TcrMessageCloseConnection(true);
} catch (std::exception& ex) {
ret = false;
LOGERROR(ex.what());
} catch (Exception& ex) {
ret = false;
LOGERROR(ex.getMessage());
} catch (...) {
ret = false;
LOGERROR("unknown exception");
}
}
if (m_allEPDisconnected == NULL) {
m_allEPDisconnected = new TcrMessageReply(true, NULL);
}
return ret;
}
void TcrMessage::cleanup() {
GF_SAFE_DELETE(m_pingMsg);
GF_SAFE_DELETE(m_closeConnMsg);
}
/* we need a static method to generate ping */
TcrMessagePing* TcrMessage::getPingMessage() { return m_pingMsg; }
TcrMessage* TcrMessage::getAllEPDisMess() { return m_allEPDisconnected; }
TcrMessage* TcrMessage::getCloseConnMessage() { return m_closeConnMsg; }
void TcrMessage::setKeepAlive(bool keepalive) {
if (TcrMessage::m_keepalive != NULL) {
*TcrMessage::m_keepalive = keepalive ? 1 : 0;
}
}
void TcrMessage::writeInterestResultPolicyPart(InterestResultPolicy policy) {
m_request->writeInt((int32_t)3); // size
m_request->write(static_cast<int8_t>(1)); // isObject
m_request->write(static_cast<int8_t>(GeodeTypeIdsImpl::FixedIDByte));
m_request->write(static_cast<int8_t>(GeodeTypeIdsImpl::InterestResultPolicy));
m_request->write(static_cast<int8_t>(policy.getOrdinal()));
}
void TcrMessage::writeIntPart(int32_t intValue) {
m_request->writeInt((int32_t)4);
m_request->write(static_cast<int8_t>(0));
m_request->writeInt(intValue);
}
void TcrMessage::writeBytePart(uint8_t byteValue) {
m_request->writeInt((int32_t)1);
m_request->write(static_cast<int8_t>(0));
m_request->write(byteValue);
}
void TcrMessage::writeByteAndTimeOutPart(uint8_t byteValue, int32_t timeout) {
m_request->writeInt((int32_t)5); // 1 (byte) + 4 (timeout)
m_request->write(static_cast<int8_t>(0));
m_request->write(byteValue);
m_request->writeInt(timeout);
}
void TcrMessage::readBooleanPartAsObject(DataInput& input, bool* boolVal) {
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
if (lenObj > 0) {
if (isObj) {
// CacheableBooleanPtr cip;
// input.readObject(cip);
//*boolVal = cip->value();
bool bVal = input.readNativeBool();
*boolVal = bVal;
}
}
/*
int32_t lenObj;
input.readInt( &lenObj );
if(lenObj!=2)
throw Exception("Invalid boolean length, should have been 2");
bool isObj;
input.readBoolean( &isObj );
if(!isObj)
throw Exception("boolean is not object");
char tmp[2];
input.readBytesOnly((int8_t*)tmp, (int32_t)2);
*boolVal = tmp[1]== 0? false : true;
*/
}
void TcrMessage::readOldValue(DataInput& input) {
int32_t lenObj;
int8_t isObj;
input.readInt(&lenObj);
input.read(&isObj);
CacheablePtr value;
input.readObject(value); // we are not using this value currently
}
void TcrMessage::readPrMetaData(DataInput& input) {
int32_t lenObj;
int8_t isObj;
input.readInt(&lenObj);
input.read(&isObj);
input.read(&isObj); // read refresh meta data byte
m_metaDataVersion = isObj;
if (lenObj == 2) {
input.read(&m_serverGroupVersion);
LOGDEBUG("Single-hop m_serverGroupVersion in message reply is %d",
m_serverGroupVersion);
}
}
VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input,
uint16_t endpointMemId) {
int8_t isObj;
input.read(&isObj);
VersionTagPtr versionTag;
if (isObj == GeodeTypeIds::NullObj) return versionTag;
if (isObj == GeodeTypeIdsImpl::FixedIDByte) {
versionTag = new VersionTag();
int8_t fixedId;
input.read(&fixedId);
if (fixedId == GeodeTypeIdsImpl::VersionTag) {
versionTag->fromData(input);
versionTag->replaceNullMemberId(endpointMemId);
return versionTag;
}
} else if (isObj == GeodeTypeIdsImpl::FixedIDShort) {
int16_t fixedId;
input.readInt(&fixedId);
if (fixedId == GeodeTypeIdsImpl::DiskVersionTag) {
DiskVersionTag* disk = new DiskVersionTag();
disk->fromData(input);
versionTag = disk;
return versionTag;
}
}
return versionTag;
}
void TcrMessage::readVersionTag(DataInput& input, uint16_t endpointMemId) {
int32_t lenObj;
int8_t isObj;
input.readInt(&lenObj);
input.read(&isObj);
if (lenObj == 0) return;
VersionTagPtr versionTag(
TcrMessage::readVersionTagPart(input, endpointMemId));
this->setVersionTag(versionTag);
}
void TcrMessage::readIntPart(DataInput& input, uint32_t* intValue) {
uint32_t intLen;
input.readInt(&intLen);
if (intLen != 4) throw Exception("int length should have been 4");
int8_t isObj;
input.read(&isObj);
if (isObj) throw Exception("Integer is not an object");
input.readInt(intValue);
}
void TcrMessage::readLongPart(DataInput& input, uint64_t* intValue) {
uint32_t longLen;
input.readInt(&longLen);
if (longLen != 8) throw Exception("long length should have been 8");
int8_t isObj;
input.read(&isObj);
if (isObj) throw Exception("Long is not an object");
input.readInt(intValue);
}
void TcrMessage::readStringPart(DataInput& input, uint32_t* len, char** str) {
char* ts;
int32_t sl;
input.readInt(&sl);
ts = new char[sl];
int8_t isObj;
input.read(&isObj);
if (isObj) throw Exception("String is not an object");
input.readBytesOnly(reinterpret_cast<int8_t*>(ts), sl);
*len = sl;
*str = ts;
}
void TcrMessage::readCqsPart(DataInput& input) {
m_cqs->clear();
readIntPart(input, &m_numCqPart);
for (uint32_t cqCnt = 0; cqCnt < m_numCqPart;) {
char* cqName;
uint32_t len;
readStringPart(input, &len, &cqName);
std::string cq(cqName, len);
delete[] cqName;
cqCnt++;
int32_t cqOp;
readIntPart(input, reinterpret_cast<uint32_t*>(&cqOp));
cqCnt++;
(*m_cqs)[cq] = cqOp;
// LOGINFO("read cqName[%s],cqOp[%d]", cq.c_str(), cqOp);
}
// LOGDEBUG("mapsize = %d", m_cqs.size());
}
inline void TcrMessage::readCallbackObjectPart(DataInput& input,
bool defaultString) {
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
if (lenObj > 0) {
if (isObj) {
input.readObject(m_callbackArgument);
} else {
if (defaultString) {
// TODO:
// m_callbackArgument = CacheableString::create(
// (char*)input.currentBufferPosition( ), lenObj );
m_callbackArgument = readCacheableString(input, lenObj);
} else {
// TODO::
// m_callbackArgument = CacheableBytes::create(
// input.currentBufferPosition( ), lenObj );
m_callbackArgument = readCacheableBytes(input, lenObj);
}
// input.advanceCursor( lenObj );
}
}
}
inline void TcrMessage::readObjectPart(DataInput& input, bool defaultString) {
int32_t lenObj;
input.readInt(&lenObj);
// bool isObj;
// input.readBoolean( &isObj );
int8_t isObj;
input.read(&isObj);
if (lenObj > 0) {
if (isObj == 1) {
input.readObject(m_value);
} else {
if (defaultString) {
// m_value = CacheableString::create(
// (char*)input.currentBufferPosition( ), lenObj );
m_value = readCacheableString(input, lenObj);
} else {
// m_value = CacheableBytes::create(
// input.currentBufferPosition( ), lenObj );
m_value = readCacheableBytes(input, lenObj);
}
// input.advanceCursor( lenObj );
}
} else if (lenObj == 0 && isObj == 2) { // EMPTY BYTE ARRAY
m_value = CacheableBytes::create();
} else if (isObj == 0) {
m_value = NULLPTR;
}
}
void TcrMessage::readSecureObjectPart(DataInput& input, bool defaultString,
bool isChunk,
uint8_t isLastChunkWithSecurity) {
LOGDEBUG(
"TcrMessage::readSecureObjectPart isChunk = %d isLastChunkWithSecurity = "
"%d",
isChunk, isLastChunkWithSecurity);
if (isChunk) {
if (!(isLastChunkWithSecurity & 0x2)) {
return;
}
}
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
LOGDEBUG(
"TcrMessage::readSecureObjectPart lenObj = %d isObj = %d, "
"m_msgTypeRequest = %d defaultString = %d ",
lenObj, isObj, m_msgTypeRequest, defaultString);
if (lenObj > 0) {
if (isObj) {
// TODO: ??
input.readObject(m_value);
} else {
if (defaultString) {
// TODO: ??
// m_value = CacheableString::create(
// (char*)input.currentBufferPosition( ), lenObj );
m_value = readCacheableString(input, lenObj);
} else {
LOGDEBUG("reading connectionid");
// TODO: this will execute always
// input.rea.readInt(&connectionId);
// m_connectionIDBytes =
// CacheableBytes::create(input.currentBufferPosition(), lenObj);
// m_connectionIDBytes = readCacheableBytes(input, lenObj);
m_connectionIDBytes =
CacheableBytes::create(input.currentBufferPosition(), lenObj);
input.advanceCursor(lenObj);
}
}
}
if (input.getBytesRemaining() != 0) {
LOGERROR("readSecureObjectPart: we not read all bytes. Messagetype:%d",
m_msgType);
throw IllegalStateException("didn't read all bytes");
}
}
void TcrMessage::readUniqueIDObjectPart(DataInput& input) {
LOGDEBUG("TcrMessage::readUniqueIDObjectPart");
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
LOGDEBUG("TcrMessage::readUniqueIDObjectPart lenObj = %d isObj = %d", lenObj,
isObj);
if (lenObj > 0) {
m_value = CacheableBytes::create(input.currentBufferPosition(), lenObj);
input.advanceCursor(lenObj);
}
}
int64_t TcrMessage::getConnectionId(TcrConnection* conn) {
if (m_connectionIDBytes != NULLPTR) {
CacheableBytesPtr tmp = conn->decryptBytes(m_connectionIDBytes);
DataInput di(tmp->value(), tmp->length());
int64_t connid;
di.readInt(&connid);
return connid;
} else {
LOGWARN("Returning 0 as internal connection ID msgtype = %d ", m_msgType);
return 0;
}
}
int64_t TcrMessage::getUniqueId(TcrConnection* conn) {
if (m_value != NULLPTR) {
CacheableBytesPtr encryptBytes =
static_cast<CacheableBytes*>(m_value.ptr());
CacheableBytesPtr tmp = conn->decryptBytes(encryptBytes);
DataInput di(tmp->value(), tmp->length());
int64_t uniqueid;
di.readInt(&uniqueid);
return uniqueid;
}
return 0;
}
inline void TcrMessage::readFailedNodePart(DataInput& input,
bool defaultString) {
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
m_failedNode = CacheableHashSet::create();
int8_t typeId = 0;
input.read(&typeId);
// input.readDirectObject(m_failedNode, typeId);
m_failedNode->fromData(input);
LOGDEBUG("readFailedNodePart m_failedNode size = %d ", m_failedNode->size());
}
inline void TcrMessage::readKeyPart(DataInput& input) {
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
if (lenObj > 0) {
if (isObj) {
input.readObject(m_key);
} else {
CacheableKeyPtr ckPtr(dynamic_cast<CacheableKey*>(
readCacheableString(input, lenObj).ptr()));
m_key = ckPtr;
/* // check whether unicode or ASCII string (bug #356)
uint16_t decodedLen = DataInput::getDecodedLength(
input.currentBufferPosition(), lenObj);
if (decodedLen == lenObj) {
// ASCII string
m_key = CacheableString::create((char*) input.currentBufferPosition(),
lenObj);
input.advanceCursor(lenObj);
}
else {
wchar_t* wideStr;
input.readUTFNoLen(&wideStr, decodedLen);
m_key = CacheableString::createNoCopy(wideStr, decodedLen);
}*/
}
}
}
inline void TcrMessage::writeInt(uint8_t* buffer, uint16_t value) {
*(buffer++) = static_cast<uint8_t>(value >> 8);
*(buffer++) = static_cast<uint8_t>(value);
}
inline void TcrMessage::writeInt(uint8_t* buffer, uint32_t value) {
*(buffer++) = static_cast<uint8_t>(value >> 24);
*(buffer++) = static_cast<uint8_t>(value >> 16);
*(buffer++) = static_cast<uint8_t>(value >> 8);
*(buffer++) = static_cast<uint8_t>(value);
}
SerializablePtr TcrMessage::readCacheableString(DataInput& input, int lenObj) {
SerializablePtr sPtr;
// check whether unicode or ASCII string (bug #356)
unsigned int decodedLen =
DataInput::getDecodedLength(input.currentBufferPosition(), lenObj);
if (decodedLen == static_cast<unsigned int>(lenObj)) {
if (decodedLen <= 0xffff) {
input.rewindCursor(2);
writeInt(const_cast<uint8_t*>(input.currentBufferPosition()),
static_cast<uint16_t>(lenObj));
input.readDirectObject(
sPtr, static_cast<int8_t>(
apache::geode::client::GeodeTypeIds::CacheableASCIIString));
} else {
input.rewindCursor(4);
writeInt(const_cast<uint8_t*>(input.currentBufferPosition()),
static_cast<uint32_t>(lenObj));
input.readDirectObject(
sPtr,
static_cast<int8_t>(
apache::geode::client::GeodeTypeIds::CacheableASCIIStringHuge));
}
} else {
if (decodedLen <= 0xffff) {
input.rewindCursor(2);
writeInt(const_cast<uint8_t*>(input.currentBufferPosition()),
static_cast<uint16_t>(lenObj));
input.readDirectObject(
sPtr, static_cast<int8_t>(
apache::geode::client::GeodeTypeIds::CacheableString));
} else {
input.rewindCursor(4);
writeInt(const_cast<uint8_t*>(input.currentBufferPosition()),
static_cast<uint32_t>(lenObj));
input.readDirectObject(
sPtr, static_cast<int8_t>(
apache::geode::client::GeodeTypeIds::CacheableStringHuge));
}
}
return sPtr;
}
SerializablePtr TcrMessage::readCacheableBytes(DataInput& input, int lenObj) {
SerializablePtr sPtr;
if (lenObj <= 252) { // 252 is java's ((byte)-4 && 0xFF)
input.rewindCursor(1);
uint8_t* buffer = const_cast<uint8_t*>(input.currentBufferPosition());
buffer[0] = static_cast<uint8_t>(lenObj);
} else if (lenObj <= 0xFFFF) {
input.rewindCursor(3);
uint8_t* buffer = const_cast<uint8_t*>(input.currentBufferPosition());
buffer[0] = static_cast<uint8_t>(-2);
writeInt(buffer + 1, static_cast<uint16_t>(lenObj));
} else {
input.rewindCursor(5);
uint8_t* buffer = const_cast<uint8_t*>(input.currentBufferPosition());
buffer[0] = static_cast<uint8_t>(-3);
writeInt(buffer + 1, static_cast<uint32_t>(lenObj));
}
input.readDirectObject(
sPtr,
static_cast<int8_t>(apache::geode::client::GeodeTypeIds::CacheableBytes));
return sPtr;
}
bool TcrMessage::readExceptionPart(DataInput& input, uint8_t isLastChunk,
bool skipFirstPart) {
// Reading exception message sent from java cache server.
// Read the first part which is serialized java exception and ignore it.
// Then read the second part which is string and use it to construct the
// exception.
isLastChunk = (isLastChunk >> 5);
LOGDEBUG("TcrMessage::readExceptionPart: isLastChunk = %d", isLastChunk);
if (skipFirstPart == true) {
skipParts(input, 1);
isLastChunk--;
}
if (/*input.getBytesRemaining() > 0 */ isLastChunk > 0) {
// Do a best effort read for exception since it is possible
// that this is invoked for non-exception chunk message which has
// only one part.
isLastChunk--;
readObjectPart(input, true);
// if (input.getBytesRemaining() > 0 && m_msgTypeRequest ==
// TcrMessage::EXECUTE_REGION_FUNCTION && m_msgType !=
// TcrMessage::EXCEPTION) {
if (isLastChunk > 0) {
readFailedNodePart(input, true);
return true; // 3 parts
} else {
return true; // 2 parts
}
}
return false;
}
void TcrMessage::writeObjectPart(const SerializablePtr& se, bool isDelta,
bool callToData,
const VectorOfCacheableKey* getAllKeyList) {
// no NULL check since for some messages NULL object may be valid
uint32_t size = 0;
m_request->writeInt(
static_cast<int32_t>(size)); // write a dummy size of 4 bytes.
// check if the type is a CacheableBytes
int8_t isObject = 1;
if (se != NULLPTR && se->typeId() == GeodeTypeIds::CacheableBytes) {
// for an emty byte array write EMPTY_BYTEARRAY_CODE(2) to is object
try {
int byteArrLength = -1;
if (instanceOf<CacheableBytesPtr>(se)) {
CacheableBytesPtr cacheableBytes = dynCast<CacheableBytesPtr>(se);
byteArrLength = cacheableBytes->length();
} else {
std::string classname(Utils::getCacheableKeyString(se)->asChar());
if (classname.find("apache::geode::client::ManagedCacheableKey") !=
std::string::npos) {
byteArrLength = se->objectSize();
}
}
if (byteArrLength == 0) {
isObject = 2;
m_request->write(isObject);
return;
}
} catch (const apache::geode::client::Exception& ex) {
LOGDEBUG("Exception in writing EMPTY_BYTE_ARRAY : %s", ex.getMessage());
}
isObject = 0;
}
if (isDelta) {
m_request->write(static_cast<int8_t>(0));
} else {
m_request->write(isObject);
}
uint32_t sizeBeforeWritingObj = m_request->getBufferLength();
if (isDelta) {
DeltaPtr deltaPtr = dynCast<DeltaPtr>(se);
deltaPtr->toDelta(*m_request);
} else if (isObject) {
if (!callToData) {
if (getAllKeyList != NULL) {
int8_t typeId = GeodeTypeIds::CacheableObjectArray;
m_request->write(typeId);
int32_t len = getAllKeyList->size();
m_request->writeArrayLen(len);
m_request->write(static_cast<int8_t>(GeodeTypeIdsImpl::Class));
m_request->write(
static_cast<int8_t>(GeodeTypeIds::CacheableASCIIString));
m_request->writeASCII("java.lang.Object");
for (int32_t index = 0; index < getAllKeyList->size(); ++index) {
m_request->writeObject(getAllKeyList->operator[](index));
}
} else {
m_request->writeObject(se, isDelta);
}
} else {
se->toData(*m_request);
}
} else {
// TODO::
// CacheableBytes* rawByteArray = static_cast<CacheableBytes*>(se.ptr());
// m_request->writeBytesOnly(rawByteArray->value(), rawByteArray->length());
writeBytesOnly(se);
}
uint32_t sizeAfterWritingObj = m_request->getBufferLength();
uint32_t sizeOfSerializedObj = sizeAfterWritingObj - sizeBeforeWritingObj;
m_request->rewindCursor(sizeOfSerializedObj + 1 + 4); //
m_request->writeInt(static_cast<int32_t>(sizeOfSerializedObj));
m_request->advanceCursor(sizeOfSerializedObj + 1);
}
void TcrMessage::readInt(uint8_t* buffer, uint16_t* value) {
uint16_t tmp = *(buffer++);
tmp = (tmp << 8) | *(buffer);
*value = tmp;
}
void TcrMessage::readInt(uint8_t* buffer, uint32_t* value) {
uint32_t tmp = *(buffer++);
tmp = (tmp << 8) | *(buffer++);
tmp = (tmp << 8) | *(buffer++);
tmp = (tmp << 8) | *(buffer++);
*value = tmp;
}
void TcrMessage::writeBytesOnly(const SerializablePtr& se) {
uint32_t cBufferLength = m_request->getBufferLength();
uint8_t* startBytes = NULL;
m_request->writeObject(se);
uint8_t* cursor =
const_cast<uint8_t*>(m_request->getBuffer()) + cBufferLength;
int pos = 1; // one byte for typeid
uint8_t code;
code = cursor[pos++];
if (code == 0xFF) {
m_request->rewindCursor(2);
} else {
int32_t result = code;
if (result > 252) { // 252 is java's ((byte)-4 && 0xFF)
if (code == 0xFE) {
uint16_t val;
readInt(cursor + pos, &val);
startBytes = cursor + 4;
result = val;
m_request->rewindCursor(4);
} else if (code == 0xFD) {
uint32_t val;
readInt(cursor + pos, &val);
startBytes = cursor + 6;
result = val;
m_request->rewindCursor(6);
}
} else {
startBytes = cursor + 2;
m_request->rewindCursor(2);
}
for (int i = 0; i < result; i++) cursor[i] = startBytes[i];
}
}
void TcrMessage::writeHeader(uint32_t msgType, uint32_t numOfParts) {
m_request->setPoolName(getPoolName());
int8_t earlyAck = 0x0;
LOGDEBUG("TcrMessage::writeHeader m_isMetaRegion = %d", m_isMetaRegion);
if (m_tcdm != NULL) {
if ((isSecurityOn =
(m_tcdm->isSecurityOn() &&
TcrMessage::isUserInitiativeOps(*this) && !m_isMetaRegion))) {
earlyAck |= 0x2;
}
}
LOGDEBUG("TcrMessage::writeHeader earlyAck = %d", earlyAck);
m_request->writeInt(static_cast<int32_t>(msgType));
m_request->writeInt((int32_t)0); // write a dummy message len('0' here). At
// the end write the length at the (buffer +
// 4) offset.
m_request->writeInt(static_cast<int32_t>(numOfParts));
TXState* txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
if (txState == NULL) {
m_txId = -1;
} else {
m_txId = txState->getTransactionId()->getId();
}
m_request->writeInt(m_txId);
// updateHeaderForRetry assumes that 16 bytes are written before earlyAck
// byte. In case,
// write header definition changes, updateHeaderForRetry should change
// accordingly.
m_request->write(earlyAck);
}
// Updates the early ack byte of the message to reflect that it is a retry op
// This function assumes that 16 bytes are written before earlyAck byte. In
// case,
// write header definition changes, this function should change accordingly.
void TcrMessage::updateHeaderForRetry() {
uint8_t earlyAck = m_request->getValueAtPos(16);
// set the isRetryBit
m_request->updateValueAtPos(16, earlyAck | 0x4);
}
void TcrMessage::writeRegionPart(const std::string& regionName) {
int32_t len = static_cast<int32_t>(regionName.length());
m_request->writeInt(len);
m_request->write(static_cast<int8_t>(0)); // isObject = 0
m_request->writeBytesOnly((int8_t*)regionName.c_str(), len);
}
void TcrMessage::writeStringPart(const std::string& regionName) {
m_request->writeFullUTF(regionName.c_str());
}
void TcrMessage::writeEventIdPart(int reserveSize,
bool fullValueAfterDeltaFail) {
EventId eid(true, reserveSize, fullValueAfterDeltaFail); // set true so we
// auto-gen next
// per-thread
// sequence number
// Write EventId threadid and seqno.
eid.writeIdsData(*m_request);
}
void TcrMessage::writeMessageLength() {
uint32_t totalLen = m_request->getBufferLength();
uint32_t msgLen = totalLen - g_headerLen;
m_request->rewindCursor(
totalLen -
4); // msg len is written after the msg type which is of 4 bytes ...
m_request->writeInt(static_cast<int32_t>(msgLen));
m_request->advanceCursor(totalLen - 8); // after writing 4 bytes for msg len
// you are already 8 bytes ahead from
// the beginning.
}
void TcrMessage::startProcessChunk(ACE_Semaphore& finalizeSema) {
if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::GETCQSTATS_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::MONITORCQ_MSG_TYPE) {
return;
}
if (m_chunkedResult == NULL) {
throw FatalInternalException(
"TcrMessage::startProcessChunk: null "
"result processor!");
}
switch (m_msgTypeRequest) {
case TcrMessage::REGISTER_INTEREST:
case TcrMessage::REGISTER_INTEREST_LIST:
case TcrMessage::QUERY:
case TcrMessage::QUERY_WITH_PARAMETERS:
case TcrMessage::EXECUTE_FUNCTION:
case TcrMessage::EXECUTE_REGION_FUNCTION:
case TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP:
case TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE:
case TcrMessage::GETDURABLECQS_MSG_TYPE:
case TcrMessage::KEY_SET:
case TcrMessage::GET_ALL_70:
case TcrMessage::GET_ALL_WITH_CALLBACK:
case TcrMessage::PUTALL:
case TcrMessage::PUT_ALL_WITH_CALLBACK:
case TcrMessage::REMOVE_ALL: {
m_chunkedResult->reset();
break;
}
default: {
LOGERROR(
"Got unexpected request message type %d while starting to process "
"response",
m_msgTypeRequest);
throw IllegalStateException(
"Got unexpected request msg type while starting to process response");
}
}
m_chunkedResult->setFinalizeSemaphore(&finalizeSema);
}
bool TcrMessage::isFEAnotherHop() { return m_feAnotherHop; }
void TcrMessage::handleSpecialFECase() {
LOGDEBUG("handleSpecialFECase1 %d", this->m_isLastChunkAndisSecurityHeader);
if ((this->m_isLastChunkAndisSecurityHeader & 0x01) == 0x01) {
LOGDEBUG("handleSpecialFECase2 %d", this->m_isLastChunkAndisSecurityHeader);
if (!((this->m_isLastChunkAndisSecurityHeader & 0x04) == 0x04)) {
LOGDEBUG("handleSpecialFECase3 %d",
this->m_isLastChunkAndisSecurityHeader);
m_feAnotherHop = true;
}
}
}
void TcrMessage::processChunk(const uint8_t* bytes, int32_t len,
uint16_t endpointmemId,
const uint8_t isLastChunkAndisSecurityHeader) {
// TODO: see if security header is there
LOGDEBUG(
"TcrMessage::processChunk isLastChunkAndisSecurityHeader = %d chunklen = "
"%d m_msgType = %d",
isLastChunkAndisSecurityHeader, len, m_msgType);
this->m_isLastChunkAndisSecurityHeader = isLastChunkAndisSecurityHeader;
handleSpecialFECase();
if (m_tcdm == NULL) {
throw FatalInternalException("TcrMessage::processChunk: null DM!");
}
switch (m_msgType) {
case TcrMessage::REPLY: {
LOGDEBUG("processChunk - got reply for request %d", m_msgTypeRequest);
chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
break;
}
case TcrMessage::RESPONSE: {
if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::GETCQSTATS_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::MONITORCQ_MSG_TYPE) {
LOGDEBUG("processChunk - got CQ response for request %d",
m_msgTypeRequest);
// TODO: do we need to do anything here
break;
} else if (m_msgTypeRequest == TcrMessage::PUTALL ||
m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) {
TcrChunkedContext* chunk = new TcrChunkedContext(
bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader);
m_chunkedResult->setEndpointMemId(endpointmemId);
m_tcdm->queueChunk(chunk);
if (bytes == NULL) {
// last chunk -- wait for processing of all the chunks to complete
m_chunkedResult->waitFinalize();
ExceptionPtr ex = m_chunkedResult->getException();
if (ex != NULLPTR) {
ex->raise();
}
}
break;
}
// fall-through for other cases
}
case EXECUTE_REGION_FUNCTION_RESULT:
case EXECUTE_FUNCTION_RESULT:
case TcrMessage::CQDATAERROR_MSG_TYPE: // one part
case TcrMessage::CQ_EXCEPTION_TYPE: // one part
case TcrMessage::RESPONSE_FROM_PRIMARY: {
if (m_chunkedResult != NULL) {
LOGDEBUG("tcrmessage in case22 ");
TcrChunkedContext* chunk = new TcrChunkedContext(
bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader);
m_chunkedResult->setEndpointMemId(endpointmemId);
m_tcdm->queueChunk(chunk);
if (bytes == NULL) {
// last chunk -- wait for processing of all the chunks to complete
m_chunkedResult->waitFinalize();
// Throw any exception during processing here.
// Do not throw it immediately since we want to read the
// full data from socket in any case.
// Notice that TcrChunkedContext::handleChunk stops any
// further processing as soon as an exception is encountered.
// This can cause behaviour like partially filled cache in case
// of populating cache with registerAllKeys(), so that should be
// documented since rolling that back may not be a good idea either.
ExceptionPtr& ex = m_chunkedResult->getException();
if (ex != NULLPTR) {
ex->raise();
}
}
} else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType ||
TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType ||
TcrMessage::GET_ALL_DATA_ERROR == m_msgType) {
if (bytes != NULL) {
chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
GF_SAFE_DELETE_ARRAY(bytes);
}
}
break;
}
case TcrMessage::REGISTER_INTEREST_DATA_ERROR: // for register interest
// error
case EXECUTE_FUNCTION_ERROR:
case EXECUTE_REGION_FUNCTION_ERROR: {
if (bytes != NULL) {
// DeleteArray<const uint8_t> delChunk(bytes);
// DataInput input(bytes, len);
// TODO: this not send two part...
// looks like this is our exception so only one part will come
// readExceptionPart(input, false);
// readSecureObjectPart(input, false, true,
// isLastChunkAndisSecurityHeader );
chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
GF_SAFE_DELETE_ARRAY(bytes);
}
break;
}
case TcrMessage::EXCEPTION: {
if (bytes != NULL) {
DeleteArray<const uint8_t> delChunk(bytes);
DataInput input(bytes, len);
readExceptionPart(input, isLastChunkAndisSecurityHeader);
readSecureObjectPart(input, false, true,
isLastChunkAndisSecurityHeader);
}
break;
}
case TcrMessage::RESPONSE_FROM_SECONDARY: {
// TODO: how many parts
chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
if (bytes != NULL) {
DeleteArray<const uint8_t> delChunk(bytes);
LOGFINEST("processChunk - got response from secondary, ignoring.");
}
break;
}
case TcrMessage::GET_ALL_DATA_ERROR: {
chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
if (bytes != NULL) {
GF_SAFE_DELETE_ARRAY(bytes);
}
// nothing else to done since this will be taken care of at higher level
break;
}
default: {
// TODO: how many parts what should we do here
if (bytes != NULL) {
GF_SAFE_DELETE_ARRAY(bytes);
} else {
LOGWARN(
"Got unhandled message type %d while processing response, possible "
"serialization mismatch",
m_msgType);
throw MessageException(
"TcrMessage::processChunk: "
"got unhandled message type");
}
break;
}
}
}
const char* TcrMessage::getPoolName() {
if (m_region != NULL) {
const PoolPtr& p = (const_cast<Region*>(m_region))->getPool();
if (p != NULLPTR) {
return p->getName();
} else {
return NULL;
}
}
return NULL;
/*ThinClientPoolDM* pool = dynamic_cast<ThinClientPoolDM*>(m_tcdm);
if(pool == NULL)
return NULL;
return pool->getName();*/
}
void TcrMessage::chunkSecurityHeader(int skipPart, const uint8_t* bytes,
int32_t len,
uint8_t isLastChunkAndSecurityHeader) {
LOGDEBUG("TcrMessage::chunkSecurityHeader:: skipParts = %d", skipPart);
if ((isLastChunkAndSecurityHeader & 0x3) == 0x3) {
DataInput di(bytes, len);
skipParts(di, skipPart);
readSecureObjectPart(di, false, true, isLastChunkAndSecurityHeader);
}
}
void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
uint16_t endpointMemId) {
DataInput input((uint8_t*)bytearray, len);
// TODO:: this need to make sure that pool is there
// if(m_tcdm == NULL)
// throw IllegalArgumentException("Pool is NULL in TcrMessage");
input.setPoolName(getPoolName());
input.readInt(&m_msgType);
int32_t msglen;
input.readInt(&msglen);
int32_t numparts;
input.readInt(&numparts);
input.readInt(&m_txId);
int8_t earlyack;
input.read(&earlyack);
LOGDEBUG(
"handleByteArrayResponse m_msgType = %d isSecurityOn = %d requesttype "
"=%d",
m_msgType, isSecurityOn, m_msgTypeRequest);
LOGDEBUG(
"Message type=%d, length=%d, parts=%d, txid=%d and eack %d with data "
"length=%d",
m_msgType, msglen, numparts, m_txId, earlyack, len);
// LOGFINE("Message type=%d, length=%d, parts=%d, txid=%d and eack %d with
// data length=%d",
// m_msgType, msglen, numparts, m_txId, earlyack, len);
switch (m_msgType) {
case TcrMessage::RESPONSE: {
if (m_msgTypeRequest == TcrMessage::CONTAINS_KEY) {
readBooleanPartAsObject(input, &m_boolValue);
} else if (m_msgTypeRequest == TcrMessage::USER_CREDENTIAL_MESSAGE) {
readUniqueIDObjectPart(input);
} else if (m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_TYPE ||
m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_ENUM) {
// int will come in response
uint32_t typeId;
readIntPart(input, &typeId);
m_value = CacheableInt32::create(typeId);
} else if (m_msgTypeRequest == TcrMessage::GET_PDX_TYPE_BY_ID) {
// PdxType will come in response
input.advanceCursor(5); // part header
m_value =
SerializationRegistry::deserialize(input, GeodeTypeIds::PdxType);
} else if (m_msgTypeRequest == TcrMessage::GET_PDX_ENUM_BY_ID) {
// PdxType will come in response
input.advanceCursor(5); // part header
m_value = SerializationRegistry::deserialize(input);
} else if (m_msgTypeRequest == TcrMessage::GET_FUNCTION_ATTRIBUTES) {
int32_t lenObj;
input.readInt(&lenObj);
int8_t isObj;
input.read(&isObj);
int8_t hR;
input.read(&hR);
int8_t isHA;
input.read(&isHA);
int8_t oFW;
input.read(&oFW);
m_functionAttributes = new std::vector<int8>();
m_functionAttributes->push_back(hR);
m_functionAttributes->push_back(isHA);
m_functionAttributes->push_back(oFW);
} else if (m_msgTypeRequest == TcrMessage::REQUEST) {
int32_t receivednumparts = 2;
readObjectPart(input);
uint32_t flag = 0;
readIntPart(input, &flag);
if (flag & 0x01) {
readCallbackObjectPart(input);
receivednumparts++;
}
if ((m_value == NULLPTR) && (flag & 0x08 /*VALUE_IS_INVALID*/)) {
m_value = CacheableToken::invalid();
}
if (flag & 0x02) {
readVersionTag(input, endpointMemId);
receivednumparts++;
}
if (flag & 0x04 /*KEY_NOT_PRESENT*/) {
m_value = CacheableToken::tombstone();
}
if (numparts > receivednumparts) readPrMetaData(input);
} else if (m_decodeAll) {
readObjectPart(input);
if (numparts == 2) {
if (m_isCallBackArguement) {
readCallbackObjectPart(input);
} else {
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
input.read(&m_metaDataVersion);
if (lenObj == 2) {
input.read(&m_serverGroupVersion);
LOGDEBUG(
"Single-hop m_serverGroupVersion in message response is %d",
m_serverGroupVersion);
}
}
} else if (numparts > 2) {
skipParts(input, 1);
int32_t lenObj;
input.readInt(&lenObj);
bool isObj;
input.readBoolean(&isObj);
input.read(&m_metaDataVersion);
LOGFINE("Single-hop metadata version in message response is %d",
m_metaDataVersion);
if (lenObj == 2) {
input.read(&m_serverGroupVersion);
LOGDEBUG(
"Single-hop m_serverGroupVersion in message response is %d",
m_serverGroupVersion);
}
}
}
break;
}
case TcrMessage::EXCEPTION: {
uint8_t lastChunk = static_cast<uint8_t>(numparts);
lastChunk = (lastChunk << 5);
readExceptionPart(input, lastChunk);
// if (isSecurityOn)
// readSecureObjectPart( input );
break;
}
case TcrMessage::INVALID: {
// Read the string in the reply
LOGWARN("Received invalid message type as reply from server");
readObjectPart(input, true);
break;
}
case TcrMessage::CLIENT_REGISTER_INTEREST:
case TcrMessage::CLIENT_UNREGISTER_INTEREST:
case TcrMessage::SERVER_TO_CLIENT_PING:
case TcrMessage::REGISTER_INSTANTIATORS: {
// ignore this
m_shouldIgnore = true;
break;
}
case TcrMessage::REGISTER_INTEREST_DATA_ERROR:
case TcrMessage::UNREGISTER_INTEREST_DATA_ERROR:
case TcrMessage::PUT_DATA_ERROR:
case TcrMessage::KEY_SET_DATA_ERROR:
case TcrMessage::REQUEST_DATA_ERROR:
case TcrMessage::DESTROY_REGION_DATA_ERROR:
case TcrMessage::CLEAR_REGION_DATA_ERROR:
case TcrMessage::CONTAINS_KEY_DATA_ERROR:
case TcrMessage::PUT_DELTA_ERROR: {
// do nothing. (?) TODO Do we need to process further.
m_shouldIgnore = true;
break;
}
case TcrMessage::REPLY: {
switch (m_msgTypeRequest) {
case TcrMessage::PUT: {
readPrMetaData(input);
uint32_t flags = 0;
readIntPart(input, &flags);
if (flags & 0x01) { // has old value
readOldValue(input);
}
if (flags & 0x04) {
readVersionTag(input, endpointMemId);
}
break;
}
case TcrMessage::INVALIDATE: {
uint32_t flags = 0;
readIntPart(input, &flags);
if (flags & 0x01) readVersionTag(input, endpointMemId);
readPrMetaData(input);
break;
}
case TcrMessage::DESTROY: {
uint32_t flags = 0;
readIntPart(input, &flags);
if (flags & 0x01) readVersionTag(input, endpointMemId);
readPrMetaData(input);
// skip the Destroy65.java response entryNotFound int part so
// that the readSecureObjectPart() call below gets the security part
// skipParts(input, 1);
readIntPart(input, &m_entryNotFound);
LOGDEBUG("Inside TcrMessage::REPLY::DESTROY m_entryNotFound = %d ",
m_entryNotFound);
break;
}
case TcrMessage::PING:
default: {
readPrMetaData(input);
break;
}
}
break;
}
case TcrMessage::LOCAL_INVALIDATE:
case TcrMessage::LOCAL_DESTROY: {
int32_t regionLen;
input.readInt(&regionLen);
int8_t isObj;
input.read(&isObj);
char* regname = NULL;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
readKeyPart(input);
// skipParts(input, 1); // skip callbackarg parts
readCallbackObjectPart(input);
readVersionTag(input, endpointMemId);
readBooleanPartAsObject(input, &m_isInterestListPassed);
readBooleanPartAsObject(input, &m_hasCqsPart);
if (m_hasCqsPart) {
if (m_msgType == TcrMessage::LOCAL_INVALIDATE) {
readIntPart(input, &m_msgTypeForCq);
} else {
m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
}
// LOGINFO("got cq local local_invalidate/local_destroy read
// m_hasCqsPart");
readCqsPart(input);
}
// read eventid part
readEventIdPart(input, false);
break;
}
case TcrMessage::LOCAL_CREATE:
case TcrMessage::LOCAL_UPDATE: {
int32_t regionLen;
input.readInt(&regionLen);
int8_t isObj;
input.read(&isObj);
char* regname = NULL;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
readKeyPart(input);
// Read delta flag
bool isDelta = false;
readBooleanPartAsObject(input, &isDelta);
if (isDelta) {
input.readInt(&m_deltaBytesLen);
int8_t isObj;
input.read(&isObj);
m_deltaBytes = new uint8_t[m_deltaBytesLen];
input.readBytesOnly(m_deltaBytes, m_deltaBytesLen);
m_delta = new DataInput(m_deltaBytes, m_deltaBytesLen);
} else {
readObjectPart(input);
}
// skip callbackarg part
// skipParts(input, 1);
readCallbackObjectPart(input);
readVersionTag(input, endpointMemId);
readBooleanPartAsObject(input, &m_isInterestListPassed);
readBooleanPartAsObject(input, &m_hasCqsPart);
if (m_hasCqsPart) {
// LOGINFO("got cq local_create/local_create");
readCqsPart(input);
m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
}
// read eventid part
readEventIdPart(input, false);
GF_SAFE_DELETE_ARRAY(regname); // COVERITY ---> 30299 Resource leak
break;
}
case TcrMessage::CLIENT_MARKER: {
// dont skip (non-existent) callbackarg part, just read eventid part
readEventIdPart(input, false);
break;
}
case TcrMessage::LOCAL_DESTROY_REGION:
case TcrMessage::CLEAR_REGION: {
int32_t regionLen;
input.readInt(&regionLen);
int8_t isObj;
input.read(&isObj);
char* regname = NULL;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
// skip callbackarg part
// skipParts(input, 1);
readCallbackObjectPart(input);
readBooleanPartAsObject(input, &m_hasCqsPart);
if (m_hasCqsPart) {
// LOGINFO("got cq region_destroy read m_hasCqsPart");
readCqsPart(input);
}
// read eventid part
readEventIdPart(input, false);
break;
}
case TcrMessage::RESPONSE_CLIENT_PR_METADATA: {
if (len == 17) {
LOGDEBUG("RESPONSE_CLIENT_PR_METADATA len is 17");
return;
}
m_metadata = new std::vector<std::vector<BucketServerLocationPtr> >();
for (int32_t i = 0; i < numparts; i++) {
int32_t bits32;
input.readInt(&bits32); // partlen;
int8_t bits8;
input.read(&bits8); // isObj;
input.read(&bits8); // cacheable vector typeid
LOGDEBUG("Expected typeID %d, got %d", GeodeTypeIds::CacheableArrayList,
bits8);
input.readArrayLen(&bits32); // array length
LOGDEBUG("Array length = %d ", bits32);
if (bits32 > 0) {
std::vector<BucketServerLocationPtr> bucketServerLocations;
for (int32_t index = 0; index < bits32; index++) {
int8_t header;
input.read(&header); // ignore DS typeid
input.read(&header); // ignore CLASS typeid
input.read(&header); // ignore string typeid
uint16_t classLen;
input.readInt(&classLen); // Read classLen
input.advanceCursor(classLen);
BucketServerLocationPtr location(new BucketServerLocation());
location->fromData(input);
LOGFINE("location contains %d\t%s\t%d\t%d\t%s",
location->getBucketId(), location->getServerName().c_str(),
location->getPort(), location->getVersion(),
(location->isPrimary() ? "true" : "false"));
bucketServerLocations.push_back(location);
}
m_metadata->push_back(bucketServerLocations);
}
LOGFINER("Metadata size is %d", m_metadata->size());
}
break;
}
case TcrMessage::GET_CLIENT_PR_METADATA_ERROR: {
LOGERROR("Failed to get single-hop meta data");
break;
}
case TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES: {
int32_t bits32;
input.readInt(&bits32); // partlen;
int8_t bits8;
input.read(&bits8); // isObj;
m_bucketCount = input.readNativeInt32(); // PART1 = bucketCount
input.readInt(&bits32); // partlen;
input.read(&bits8); // isObj;
if (bits32 > 0) {
input.readNativeString(m_colocatedWith); // PART2 = colocatedwith
}
if (numparts == 4) {
input.readInt(&bits32); // partlen;
input.read(&bits8); // isObj;
if (bits32 > 0) {
input.readNativeString(
m_partitionResolverName); // PART3 = partitionresolvername
}
input.readInt(&bits32); // partlen;
input.read(&bits8); // isObj;
input.read(&bits8); // cacheable CacheableHashSet typeid
input.readArrayLen(&bits32); // array length
if (bits32 > 0) {
m_fpaSet = new std::vector<FixedPartitionAttributesImplPtr>();
for (int32_t index = 0; index < bits32; index++) {
int8_t header;
input.read(&header); // ignore DS typeid
input.read(&header); // ignore CLASS typeid
input.read(&header); // ignore string typeid
uint16_t classLen;
input.readInt(&classLen); // Read classLen
input.advanceCursor(classLen);
FixedPartitionAttributesImplPtr fpa(
new FixedPartitionAttributesImpl());
fpa->fromData(input); // PART4 = set of FixedAttributes.
LOGDEBUG("fpa contains %d\t%s\t%d\t%d", fpa->getNumBuckets(),
fpa->getPartitionName().c_str(), fpa->isPrimary(),
fpa->getStartingBucketID());
m_fpaSet->push_back(fpa);
}
}
}
break;
}
case TcrMessage::TOMBSTONE_OPERATION: {
uint32_t tombstoneOpType;
int32_t regionLen;
input.readInt(&regionLen);
int8_t isObj;
input.read(&isObj);
char* regname = NULL;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
readIntPart(input, &tombstoneOpType); // partlen;
int32_t len;
input.readInt(&len);
input.read(&isObj);
if (tombstoneOpType == 0) {
if (m_tombstoneVersions == NULLPTR) {
m_tombstoneVersions = CacheableHashMap::create();
}
readHashMapForGCVersions(input, m_tombstoneVersions);
} else if (tombstoneOpType == 1) {
if (m_tombstoneKeys == NULLPTR) {
m_tombstoneKeys = CacheableHashSet::create();
}
// input.readObject(m_tombstoneKeys);
readHashSetForGCVersions(input, m_tombstoneKeys);
} else {
LOGERROR("Failed to read the tombstone versions");
break;
}
// readEventId Part
readEventIdPart(input, false);
break;
}
case TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES_ERROR: {
LOGERROR("Failed to get server partitioned region attributes");
break;
}
case TcrMessage::UNKNOWN_MESSAGE_TYPE_ERROR: {
// do nothing
break;
}
case TcrMessage::REQUEST_EVENT_VALUE_ERROR: {
LOGERROR("Error while requesting full value for delta");
break;
}
default:
LOGERROR(
"Unknown message type %d in response, possible serialization "
"mismatch",
m_msgType);
StackTrace st;
st.print();
throw MessageException("handleByteArrayResponse: unknown message type");
}
LOGDEBUG("handleByteArrayResponse earlyack = %d ", earlyack);
if (earlyack & 0x2) readSecureObjectPart(input);
}
TcrMessageDestroyRegion::TcrMessageDestroyRegion(
const Region* region, const UserDataPtr& aCallbackArgument,
int messageResponsetimeout, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::DESTROY_REGION;
m_tcdm = connectionDM;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_messageResponseTimeout = messageResponsetimeout;
uint32_t numOfParts = 1;
if (aCallbackArgument != NULLPTR) {
++numOfParts;
}
numOfParts++;
if (m_messageResponseTimeout != -1) numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeEventIdPart();
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
}
if (m_messageResponseTimeout != -1) {
writeIntPart(m_messageResponseTimeout);
}
writeMessageLength();
}
TcrMessageClearRegion::TcrMessageClearRegion(
const Region* region, const UserDataPtr& aCallbackArgument,
int messageResponsetimeout, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::CLEAR_REGION;
m_tcdm = connectionDM;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_messageResponseTimeout = messageResponsetimeout;
isSecurityOn = false;
m_isSecurityHeaderAdded = false;
uint32_t numOfParts = 1;
if (aCallbackArgument != NULLPTR) {
++numOfParts;
}
numOfParts++;
if (m_messageResponseTimeout != -1) numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeEventIdPart();
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
}
if (m_messageResponseTimeout != -1) {
writeIntPart(m_messageResponseTimeout);
}
writeMessageLength();
}
TcrMessageQuery::TcrMessageQuery(const std::string& regionName,
int messageResponsetimeout,
ThinClientBaseDM* connectionDM) {
m_request = new DataOutput;
m_msgType = TcrMessage::QUERY;
m_tcdm = connectionDM;
m_regionName = regionName; // this is querystri;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_messageResponseTimeout = messageResponsetimeout;
m_region = NULL;
uint32_t numOfParts = 1;
numOfParts++;
if (m_messageResponseTimeout != -1) numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeEventIdPart();
if (m_messageResponseTimeout != -1) {
writeIntPart(m_messageResponseTimeout);
}
writeMessageLength();
}
TcrMessageStopCQ::TcrMessageStopCQ(const std::string& regionName,
int messageResponsetimeout,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::STOPCQ_MSG_TYPE;
m_tcdm = connectionDM;
m_regionName = regionName; // this is querystring
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_messageResponseTimeout = messageResponsetimeout;
m_region = NULL;
m_isSecurityHeaderAdded = false;
m_isMetaRegion = false;
uint32_t numOfParts = 1;
numOfParts++;
if (m_messageResponseTimeout != -1) numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeEventIdPart();
if (m_messageResponseTimeout != -1) {
writeIntPart(m_messageResponseTimeout);
}
writeMessageLength();
}
TcrMessageCloseCQ::TcrMessageCloseCQ(const std::string& regionName,
int messageResponsetimeout,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::CLOSECQ_MSG_TYPE;
m_tcdm = connectionDM;
m_regionName = regionName; // this is querystring
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_messageResponseTimeout = messageResponsetimeout;
m_region = NULL;
uint32_t numOfParts = 1;
numOfParts++;
if (m_messageResponseTimeout != -1) numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeEventIdPart();
if (m_messageResponseTimeout != -1) {
writeIntPart(m_messageResponseTimeout);
}
writeMessageLength();
}
TcrMessageQueryWithParameters::TcrMessageQueryWithParameters(
const std::string& regionName, const UserDataPtr& aCallbackArgument,
CacheableVectorPtr paramList, int messageResponsetimeout,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::QUERY_WITH_PARAMETERS;
m_tcdm = connectionDM;
m_regionName = regionName;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_messageResponseTimeout = messageResponsetimeout;
m_region = NULL;
// Find out the numOfParts
uint32_t numOfParts = 4 + paramList->size();
writeHeader(m_msgType, numOfParts);
// Part-1: Query String
writeRegionPart(m_regionName);
// Part-2: Number or length of the parameters
writeIntPart(paramList->size());
// Part-3: X (COMPILE_QUERY_CLEAR_TIMEOUT) parameter
writeIntPart(15);
// Part-4: Request specific timeout
if (m_messageResponseTimeout != -1) {
writeIntPart(m_messageResponseTimeout);
}
// Part-5: Parameters
if (paramList != NULLPTR) {
for (int32_t i = 0; i < paramList->size(); i++) {
CacheablePtr value = (*paramList)[i];
writeObjectPart(value);
}
}
writeMessageLength();
}
TcrMessageContainsKey::TcrMessageContainsKey(
const Region* region, const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument, bool isContainsKey,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::CONTAINS_KEY;
m_tcdm = connectionDM;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
uint32_t numOfParts = 2;
if (aCallbackArgument != NULLPTR) {
++numOfParts;
}
numOfParts++;
if (key == NULLPTR) {
delete m_request;
throw IllegalArgumentException(
"key passed to the constructor can't be NULL");
}
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeObjectPart(key);
// write 0 to indicate containskey (1 for containsvalueforkey)
writeIntPart(isContainsKey ? 0 : 1);
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
}
writeMessageLength();
}
TcrMessageGetDurableCqs::TcrMessageGetDurableCqs(
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::GETDURABLECQS_MSG_TYPE;
m_tcdm = connectionDM;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_region = NULL;
// wrirting msgtype with part length =1
writeHeader(m_msgType, 1);
// the server expects at least 1 part, so writing a dummy byte part
writeBytePart(0);
writeMessageLength();
}
TcrMessageRequest::TcrMessageRequest(const Region* region,
const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::REQUEST;
m_tcdm = connectionDM;
m_key = key;
m_regionName =
(region == NULL ? "INVALID_REGION_NAME" : region->getFullPath());
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
uint32_t numOfParts = 2;
if (aCallbackArgument != NULLPTR) {
++numOfParts;
}
numOfParts++;
if (key == NULLPTR) {
delete m_request;
throw IllegalArgumentException(
"key passed to the constructor can't be NULL");
}
numOfParts--; // no event id for request
writeHeader(TcrMessage::REQUEST, numOfParts);
writeRegionPart(m_regionName);
writeObjectPart(key);
if (aCallbackArgument != NULLPTR) {
// set bool variable to true.
m_isCallBackArguement = true;
writeObjectPart(aCallbackArgument);
}
writeMessageLength();
}
TcrMessageInvalidate::TcrMessageInvalidate(const Region* region,
const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::INVALIDATE;
m_tcdm = connectionDM;
m_key = key;
m_regionName =
(region == NULL ? "INVALID_REGION_NAME" : region->getFullPath());
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
uint32_t numOfParts = 2;
if (aCallbackArgument != NULLPTR) {
++numOfParts;
}
numOfParts++;
if (key == NULLPTR) {
delete m_request;
throw IllegalArgumentException(
"key passed to the constructor can't be NULL");
}
writeHeader(TcrMessage::INVALIDATE, numOfParts);
writeRegionPart(m_regionName);
writeObjectPart(key);
writeEventIdPart();
if (aCallbackArgument != NULLPTR) {
// set bool variable to true.
m_isCallBackArguement = true;
writeObjectPart(aCallbackArgument);
}
writeMessageLength();
}
TcrMessageDestroy::TcrMessageDestroy(const Region* region,
const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::DESTROY;
m_tcdm = connectionDM;
m_key = key;
m_regionName =
(region == NULL ? "INVALID_REGION_NAME" : region->getFullPath());
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
uint32_t numOfParts = 2;
if (aCallbackArgument != NULLPTR) {
++numOfParts;
}
numOfParts++;
if (key == NULLPTR) {
delete m_request;
throw IllegalArgumentException(
"key passed to the constructor can't be NULL");
}
if (value != NULLPTR) {
numOfParts += 2; // for GFE Destroy65.java
writeHeader(TcrMessage::DESTROY, numOfParts);
writeRegionPart(m_regionName);
writeObjectPart(key);
writeObjectPart(value); // expectedOldValue part
uint8_t removeByte = 8; // OP_TYPE_DESTROY value from Operation.java
CacheableBytePtr removeBytePart = CacheableByte::create(removeByte);
writeObjectPart(removeBytePart); // operation part
writeEventIdPart();
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
}
writeMessageLength();
} else {
numOfParts += 2; // for GFE Destroy65.java
writeHeader(TcrMessage::DESTROY, numOfParts);
writeRegionPart(m_regionName);
writeObjectPart(key);
writeObjectPart(NULLPTR); // expectedOldValue part
writeObjectPart(NULLPTR); // operation part
writeEventIdPart();
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
}
writeMessageLength();
}
}
TcrMessagePut::TcrMessagePut(const Region* region, const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument, bool isDelta,
ThinClientBaseDM* connectionDM, bool isMetaRegion,
bool fullValueAfterDeltaFail,
const char* regionName) {
// m_securityHeaderLength = 0;
m_isMetaRegion = isMetaRegion;
m_msgType = TcrMessage::PUT;
m_tcdm = connectionDM;
m_key = key;
m_regionName = region != NULL ? region->getFullPath() : regionName;
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
// TODO check the number of parts in this constructor. doubt because in PUT
// value can be NULL also.
uint32_t numOfParts = 5;
if (aCallbackArgument != NULLPTR) {
++numOfParts;
}
numOfParts++;
if (key == NULLPTR) {
delete m_request;
throw IllegalArgumentException(
"key passed to the constructor can't be NULL");
}
numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeObjectPart(NULLPTR); // operation = null
writeIntPart(0); // flags = 0
writeObjectPart(key);
writeObjectPart(CacheableBoolean::create(isDelta));
writeObjectPart(value, isDelta);
writeEventIdPart(0, fullValueAfterDeltaFail);
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
}
writeMessageLength();
}
TcrMessageReply::TcrMessageReply(bool decodeAll,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::INVALID;
m_decodeAll = decodeAll;
m_tcdm = connectionDM;
if (connectionDM != NULL) isSecurityOn = connectionDM->isSecurityOn();
}
TcrMessagePing::TcrMessagePing(bool decodeAll) {
m_msgType = TcrMessage::PING;
m_decodeAll = decodeAll;
m_request->writeInt(m_msgType);
m_request->writeInt(
(int32_t)0); // 17 is fixed message len ... PING only has a header.
m_request->writeInt((int32_t)0); // Number of parts.
// int32_t txId = TcrMessage::m_transactionId++;
// Setting the txId to 0 for all ping message as it is not being used on the
// SERVER side or the
// client side.
m_request->writeInt((int32_t)0);
m_request->write(static_cast<int8_t>(0)); // Early ack is '0'.
m_msgLength = g_headerLen;
m_txId = 0;
}
TcrMessageCloseConnection::TcrMessageCloseConnection(bool decodeAll) {
m_msgType = TcrMessage::CLOSE_CONNECTION;
m_decodeAll = decodeAll;
m_request->writeInt(m_msgType);
m_request->writeInt((int32_t)6);
m_request->writeInt((int32_t)1); // Number of parts.
// int32_t txId = TcrMessage::m_transactionId++;
m_request->writeInt((int32_t)0);
m_request->write(static_cast<int8_t>(0)); // Early ack is '0'.
// last two parts are not used ... setting zero in both the parts.
m_request->writeInt((int32_t)1); // len is 1
m_request->write(static_cast<int8_t>(0)); // is obj is '0'.
// cast away constness here since we want to modify this
TcrMessage::m_keepalive = const_cast<uint8_t*>(m_request->getCursor());
m_request->write(static_cast<int8_t>(0)); // keepalive is '0'.
}
TcrMessageClientMarker::TcrMessageClientMarker(bool decodeAll) {
m_msgType = TcrMessage::CLIENT_MARKER;
m_decodeAll = decodeAll;
}
TcrMessageRegisterInterestList::TcrMessageRegisterInterestList(
const Region* region, const VectorOfCacheableKey& keys, bool isDurable,
bool isCachingEnabled, bool receiveValues,
InterestResultPolicy interestPolicy, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::REGISTER_INTEREST_LIST;
m_tcdm = connectionDM;
m_keyList = &keys;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_isDurable = isDurable;
m_receiveValues = receiveValues;
uint32_t numInItrestList = keys.size();
GF_R_ASSERT(numInItrestList != 0);
uint32_t numOfParts = 2 + numInItrestList;
numOfParts += 2 - numInItrestList;
numOfParts += 2;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeInterestResultPolicyPart(interestPolicy);
writeBytePart(isDurable ? 1 : 0); // keepalive
CacheableArrayListPtr cal(CacheableArrayList::create());
for (uint32_t i = 0; i < numInItrestList; i++) {
if (keys[i] == NULLPTR) {
delete m_request;
throw IllegalArgumentException(
"keys in the interest list cannot be NULL");
}
cal->push_back(keys[i]);
}
writeObjectPart(cal);
uint8_t bytes[2];
CacheableBytesPtr byteArr = NULLPTR;
bytes[0] = receiveValues ? 0 : 1; // reveive values
byteArr = CacheableBytes::create(bytes, 1);
writeObjectPart(byteArr);
bytes[0] = isCachingEnabled ? 1 : 0; // region policy
bytes[1] = 0; // serialize values
byteArr = CacheableBytes::create(bytes, 2);
writeObjectPart(byteArr);
writeMessageLength();
m_interestPolicy = interestPolicy.ordinal;
}
TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList(
const Region* region, const VectorOfCacheableKey& keys, bool isDurable,
bool isCachingEnabled, bool receiveValues,
InterestResultPolicy interestPolicy, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::UNREGISTER_INTEREST_LIST;
m_tcdm = connectionDM;
m_keyList = &keys;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
m_timeout = DEFAULT_TIMEOUT_SECONDS;
m_isDurable = isDurable;
m_receiveValues = receiveValues;
uint32_t numInItrestList = keys.size();
GF_R_ASSERT(numInItrestList != 0);
uint32_t numOfParts = 2 + numInItrestList;
numOfParts += 2;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeBytePart(0); // isClosing
writeBytePart(isDurable ? 1 : 0); // keepalive
writeIntPart(static_cast<int32_t>(numInItrestList));
for (uint32_t i = 0; i < numInItrestList; i++) {
if (keys[i] == NULLPTR) {
delete m_request;
throw IllegalArgumentException(
"keys in the interest list cannot be NULL");
}
writeObjectPart(keys[i]);
}
writeMessageLength();
m_interestPolicy = interestPolicy.ordinal;
}
TcrMessageCreateRegion::TcrMessageCreateRegion(
const std::string& str1, const std::string& str2,
InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled,
bool receiveValues, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::CREATE_REGION;
m_tcdm = connectionDM;
m_isDurable = isDurable;
m_receiveValues = receiveValues;
uint32_t numOfParts = 2;
writeHeader(m_msgType, numOfParts);
writeRegionPart(str1); // parent region name
writeRegionPart(str2); // region name
writeMessageLength();
m_regionName = str2;
}
TcrMessageRegisterInterest::TcrMessageRegisterInterest(
const std::string& str1, const std::string& str2,
InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled,
bool receiveValues, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::REGISTER_INTEREST;
m_tcdm = connectionDM;
m_isDurable = isDurable;
m_receiveValues = receiveValues;
uint32_t numOfParts = 7;
writeHeader(m_msgType, numOfParts);
writeRegionPart(str1); // region name
writeIntPart(REGULAR_EXPRESSION); // InterestType
writeInterestResultPolicyPart(interestPolicy); // InterestResultPolicy
writeBytePart(isDurable ? 1 : 0);
writeRegionPart(str2); // regexp string
uint8_t bytes[2];
CacheableBytesPtr byteArr = NULLPTR;
bytes[0] = receiveValues ? 0 : 1;
byteArr = CacheableBytes::create(bytes, 1);
writeObjectPart(byteArr);
bytes[0] = isCachingEnabled ? 1 : 0; // region data policy
bytes[1] = 0; // serializevalues
byteArr = CacheableBytes::create(bytes, 2);
writeObjectPart(byteArr);
writeMessageLength();
m_regionName = str1;
m_regex = str2;
m_interestPolicy = interestPolicy.ordinal;
}
TcrMessageUnregisterInterest::TcrMessageUnregisterInterest(
const std::string& str1, const std::string& str2,
InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled,
bool receiveValues, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::UNREGISTER_INTEREST;
m_tcdm = connectionDM;
m_isDurable = isDurable;
m_receiveValues = receiveValues;
uint32_t numOfParts = 3;
numOfParts += 2;
writeHeader(m_msgType, numOfParts);
writeRegionPart(str1); // region name
writeIntPart(REGULAR_EXPRESSION); // InterestType
writeRegionPart(str2); // regexp string
writeBytePart(0); // isClosing
writeBytePart(isDurable ? 1 : 0); // keepalive
writeMessageLength();
m_regionName = str1;
m_regex = str2;
m_interestPolicy = interestPolicy.ordinal;
}
TcrMessageTxSynchronization::TcrMessageTxSynchronization(int ordinal, int txid,
int status) {
m_msgType = TcrMessage::TX_SYNCHRONIZATION;
writeHeader(m_msgType, ordinal == 1 ? 3 : 2);
writeIntPart(ordinal);
writeIntPart(txid);
if (ordinal == 1) {
writeIntPart(status);
}
writeMessageLength();
}
TcrMessageClientReady::TcrMessageClientReady() {
m_msgType = TcrMessage::CLIENT_READY;
writeHeader(m_msgType, 1);
// the server expects at least 1 part, so writing a dummy
writeBytePart(0);
writeMessageLength();
}
TcrMessageCommit::TcrMessageCommit() {
m_msgType = TcrMessage::COMMIT;
writeHeader(m_msgType, 1);
// the server expects at least 1 part, so writing a dummy
writeBytePart(0);
writeMessageLength();
}
TcrMessageRollback::TcrMessageRollback() {
m_msgType = TcrMessage::ROLLBACK;
writeHeader(m_msgType, 1);
// the server expects at least 1 part, so writing a dummy
writeBytePart(0);
writeMessageLength();
}
TcrMessageTxFailover::TcrMessageTxFailover() {
m_msgType = TcrMessage::TX_FAILOVER;
writeHeader(m_msgType, 1);
// the server expects at least 1 part, so writing a dummy
writeBytePart(0);
writeMessageLength();
}
// constructor for MAKE_PRIMARY message.
TcrMessageMakePrimary::TcrMessageMakePrimary(bool processedMarker) {
m_msgType = TcrMessage::MAKE_PRIMARY;
writeHeader(m_msgType, 1);
writeBytePart(processedMarker ? 1 : 0); // boolean processedMarker
writeMessageLength();
}
// constructor for PERIODIC_ACK of notified eventids
TcrMessagePeriodicAck::TcrMessagePeriodicAck(
const EventIdMapEntryList& entries) {
m_msgType = TcrMessage::PERIODIC_ACK;
uint32_t numParts = static_cast<uint32_t>(entries.size());
GF_D_ASSERT(numParts > 0);
writeHeader(m_msgType, numParts);
for (EventIdMapEntryList::const_iterator entry = entries.begin();
entry != entries.end(); ++entry) {
EventSourcePtr src = entry->first;
EventSequencePtr seq = entry->second;
EventIdPtr eid = EventId::create(src->getMemId(), src->getMemIdLen(),
src->getThrId(), seq->getSeqNum());
writeObjectPart(eid);
}
writeMessageLength();
}
TcrMessagePutAll::TcrMessagePutAll(const Region* region,
const HashMapOfCacheable& map,
int messageResponsetimeout,
ThinClientBaseDM* connectionDM,
const UserDataPtr& aCallbackArgument) {
m_tcdm = connectionDM;
m_regionName = region->getFullPath();
m_region = region;
m_messageResponseTimeout = messageResponsetimeout;
// TODO check the number of parts in this constructor. doubt because in PUT
// value can be NULL also.
uint32_t numOfParts = 0;
// bool skipCallBacks = false;
if (aCallbackArgument != NULLPTR) {
m_msgType = TcrMessage::PUT_ALL_WITH_CALLBACK;
numOfParts = 6 + map.size() * 2;
// skipCallBacks = false;
} else {
m_msgType = TcrMessage::PUTALL;
numOfParts = 5 + map.size() * 2;
// skipCallBacks = true;
}
// numOfParts++;
if (m_messageResponseTimeout != -1) numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeEventIdPart(map.size() - 1);
// writeIntPart(skipCallBacks ? 0 : 1);
writeIntPart(0);
// Client putAll requests now send a flags int as part #0. 1==region has
// datapolicy.EMPTY, 2==region has concurrency checks enabled.
// Version tags are not sent back if dp is EMPTY or concurrency
// checks are disabled.
int flags = 0;
if (!region->getAttributes()->getCachingEnabled()) {
flags |= TcrMessage::m_flag_empty;
LOGDEBUG("TcrMessage::PUTALL datapolicy empty flags = %d ", flags);
}
if (region->getAttributes()->getConcurrencyChecksEnabled()) {
flags |= TcrMessage::m_flag_concurrency_checks;
LOGDEBUG("TcrMessage::PUTALL ConcurrencyChecksEnabled flags = %d ", flags);
}
writeIntPart(flags);
writeIntPart(map.size());
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
}
for (HashMapOfCacheable::Iterator iter = map.begin(); iter != map.end();
++iter) {
CacheableKeyPtr key = iter.first();
CacheablePtr value = iter.second();
writeObjectPart(key);
writeObjectPart(value);
}
if (m_messageResponseTimeout != -1) {
writeIntPart(m_messageResponseTimeout);
}
writeMessageLength();
}
TcrMessageRemoveAll::TcrMessageRemoveAll(const Region* region,
const VectorOfCacheableKey& keys,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::REMOVE_ALL;
m_tcdm = connectionDM;
m_regionName = region->getFullPath();
m_region = region;
// TODO check the number of parts in this constructor. doubt because in PUT
// value can be NULL also.
uint32_t numOfParts = 5 + keys.size();
if (m_messageResponseTimeout != -1) numOfParts++;
writeHeader(m_msgType, numOfParts);
writeRegionPart(m_regionName);
writeEventIdPart(keys.size() - 1);
// Client removeall requests now send a flags int as part #0. 1==region has
// datapolicy.EMPTY, 2==region has concurrency checks enabled.
// Version tags are not sent back if dp is EMPTY or concurrency
// checks are disabled.
int flags = 0;
if (!region->getAttributes()->getCachingEnabled()) {
flags |= TcrMessage::m_flag_empty;
LOGDEBUG("TcrMessage::REMOVE_ALL datapolicy empty flags = %d ", flags);
}
if (region->getAttributes()->getConcurrencyChecksEnabled()) {
flags |= TcrMessage::m_flag_concurrency_checks;
LOGDEBUG("TcrMessage::REMOVE_ALL ConcurrencyChecksEnabled flags = %d ",
flags);
}
writeIntPart(flags);
writeObjectPart(aCallbackArgument);
writeIntPart(keys.size());
for (VectorOfCacheableKey::Iterator iter = keys.begin(); iter != keys.end();
++iter) {
writeObjectPart(*iter);
}
writeMessageLength();
}
TcrMessageUpdateClientNotification::TcrMessageUpdateClientNotification(
int32_t port) {
m_msgType = TcrMessage::UPDATE_CLIENT_NOTIFICATION;
writeHeader(m_msgType, 1);
writeIntPart(port);
writeMessageLength();
}
TcrMessageGetAll::TcrMessageGetAll(const Region* region,
const VectorOfCacheableKey* keys,
ThinClientBaseDM* connectionDM,
const UserDataPtr& aCallbackArgument) {
m_msgType = TcrMessage::GET_ALL_70;
m_tcdm = connectionDM;
m_keyList = keys;
m_callbackArgument = aCallbackArgument;
m_regionName = region->getFullPath();
m_region = region;
/*CacheableObjectArrayPtr keyArr = NULLPTR;
if (keys != NULL) {
keyArr = CacheableObjectArray::create();
for (int32_t index = 0; index < keys->size(); ++index) {
keyArr->push_back(keys->operator[](index));
}
}*/
if (m_callbackArgument != NULLPTR) {
m_msgType = TcrMessage::GET_ALL_WITH_CALLBACK;
} else {
m_msgType = TcrMessage::GET_ALL_70;
}
writeHeader(m_msgType, 3);
writeRegionPart(m_regionName);
/*writeHeader(m_msgType, 2);
writeRegionPart(regionName);
writeObjectPart(keyArr);
writeMessageLength();*/
}
void TcrMessage::InitializeGetallMsg(const UserDataPtr& aCallbackArgument) {
/*CacheableObjectArrayPtr keyArr = NULLPTR;
if (m_keyList != NULL) {
keyArr = CacheableObjectArray::create();
for (int32_t index = 0; index < m_keyList->size(); ++index) {
keyArr->push_back(m_keyList->operator[](index));
}
}*/
// LOGINFO(" in InitializeGetallMsg %s ", m_regionName.c_str());
// writeHeader(m_msgType, 2);
// writeRegionPart(m_regionName);
writeObjectPart(NULLPTR, false, false, m_keyList); // will do manually
if (aCallbackArgument != NULLPTR) {
writeObjectPart(aCallbackArgument);
} else {
writeIntPart(0);
}
writeMessageLength();
}
TcrMessageExecuteCq::TcrMessageExecuteCq(const std::string& str1,
const std::string& str2, int state,
bool isDurable,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::EXECUTECQ_MSG_TYPE;
m_tcdm = connectionDM;
m_isDurable = isDurable;
uint32_t numOfParts = 5;
writeHeader(m_msgType, numOfParts);
writeStringPart(str1); // cqName
writeStringPart(str2); // query string
writeIntPart(state); // cq state
writeBytePart(isDurable ? 1 : 0);
// hard-coding region data policy to 1
// This Part will be removed when server-side changes are made to remove
// CQ dependency on region data policy. After the changes, set numOfParts
// to 4 (currently 5).
writeBytePart(1);
writeMessageLength();
m_regionName = str1;
m_regex = str2;
}
TcrMessageExecuteCqWithIr::TcrMessageExecuteCqWithIr(
const std::string& str1, const std::string& str2, int state, bool isDurable,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE;
m_tcdm = connectionDM;
m_isDurable = isDurable;
uint32_t numOfParts = 5;
writeHeader(m_msgType, numOfParts);
writeStringPart(str1); // cqName
writeStringPart(str2); // query string
writeIntPart(state); // cq state
writeBytePart(isDurable ? 1 : 0);
// hard-coding region data policy to 1
// This Part will be removed when server-side changes are made to remove
// CQ dependency on region data policy. After the changes, set numOfParts
// to 4 (currently 5).
writeBytePart(1);
writeMessageLength();
m_regionName = str1;
m_regex = str2;
}
TcrMessageExecuteFunction::TcrMessageExecuteFunction(
const std::string& funcName, const CacheablePtr& args, uint8_t getResult,
ThinClientBaseDM* connectionDM, int32_t timeout) {
m_msgType = TcrMessage::EXECUTE_FUNCTION;
m_tcdm = connectionDM;
m_hasResult = getResult;
uint32_t numOfParts = 3;
writeHeader(m_msgType, numOfParts);
// writeBytePart(getResult ? 1 : 0);
// if gfcpp property unit set then timeout will be in millisecond
// otherwise it will be in second
if ((DistributedSystem::getSystemProperties() != NULL) &&
(DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) {
writeByteAndTimeOutPart(getResult, timeout);
} else {
writeByteAndTimeOutPart(getResult, (timeout * 1000));
}
writeRegionPart(funcName); // function name string
writeObjectPart(args);
writeMessageLength();
}
TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction(
const std::string& funcName, const Region* region, const CacheablePtr& args,
CacheableVectorPtr routingObj, uint8_t getResult,
CacheableHashSetPtr failedNodes, int32_t timeout,
ThinClientBaseDM* connectionDM, int8_t reExecute) {
m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION;
m_tcdm = connectionDM;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
if (routingObj != NULLPTR && routingObj->size() == 1) {
LOGDEBUG("setting up key");
m_key = routingObj->at(0);
}
uint32_t numOfParts = 6 + (routingObj == NULLPTR ? 0 : routingObj->size());
numOfParts +=
2; // for the FunctionHA isReExecute and removedNodesSize parts.
if (failedNodes != NULLPTR) {
numOfParts++;
}
writeHeader(m_msgType, numOfParts);
// if gfcpp property unit set then timeout will be in millisecond
// otherwise it will be in second
if ((DistributedSystem::getSystemProperties() != NULL) &&
(DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) {
writeByteAndTimeOutPart(getResult, timeout);
} else {
writeByteAndTimeOutPart(getResult, (timeout * 1000));
}
writeRegionPart(m_regionName);
writeRegionPart(funcName); // function name string
writeObjectPart(args);
// klug for MemberMappedArgs
writeObjectPart(NULLPTR);
writeBytePart(reExecute); // FunctionHA isReExecute = false
writeIntPart(routingObj == NULLPTR ? 0 : routingObj->size());
if (routingObj != NULLPTR) {
for (int32_t i = 0; i < routingObj->size(); i++) {
CacheablePtr value = routingObj->operator[](i);
writeObjectPart(value);
}
}
if (failedNodes != NULLPTR) {
writeIntPart(failedNodes->size());
writeObjectPart(failedNodes);
} else {
writeIntPart(0); // FunctionHA removedNodesSize = 0
}
writeMessageLength();
}
TcrMessageExecuteRegionFunctionSingleHop::
TcrMessageExecuteRegionFunctionSingleHop(
const std::string& funcName, const Region* region,
const CacheablePtr& args, CacheableHashSetPtr routingObj,
uint8_t getResult, CacheableHashSetPtr failedNodes, bool allBuckets,
int32_t timeout, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP;
m_tcdm = connectionDM;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
uint32_t numOfParts = 6 + (routingObj == NULLPTR ? 0 : routingObj->size());
numOfParts +=
2; // for the FunctionHA isReExecute and removedNodesSize parts.
if (failedNodes != NULLPTR) {
numOfParts++;
}
writeHeader(m_msgType, numOfParts);
// if gfcpp property unit set then timeout will be in millisecond
// otherwise it will be in second
if ((DistributedSystem::getSystemProperties() != NULL) &&
(DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) {
writeByteAndTimeOutPart(getResult, timeout);
} else {
writeByteAndTimeOutPart(getResult, (timeout * 1000));
}
writeRegionPart(m_regionName);
writeRegionPart(funcName); // function name string
writeObjectPart(args);
// klug for MemberMappedArgs
writeObjectPart(NULLPTR);
writeBytePart(allBuckets ? 1 : 0);
writeIntPart(routingObj == NULLPTR ? 0 : routingObj->size());
if (routingObj != NULLPTR) {
if (allBuckets) {
LOGDEBUG("All Buckets so putting IntPart for buckets = %d ",
routingObj->size());
for (CacheableHashSet::Iterator itr = routingObj->begin();
itr != routingObj->end(); ++itr) {
CacheableInt32Ptr value = *itr;
writeIntPart(value->value());
}
} else {
LOGDEBUG("putting keys as withFilter called, routing Keys size = %d ",
routingObj->size());
for (CacheableHashSet::Iterator itr = routingObj->begin();
itr != routingObj->end(); ++itr) {
CacheablePtr value = *itr;
writeObjectPart(value);
}
}
}
if (failedNodes != NULLPTR) {
writeIntPart(failedNodes->size());
writeObjectPart(failedNodes);
} else {
writeIntPart(0); // FunctionHA removedNodesSize = 0
}
writeMessageLength();
}
TcrMessageGetClientPartitionAttributes::TcrMessageGetClientPartitionAttributes(
const char* regionName) {
m_msgType = TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES;
writeHeader(m_msgType, 1);
writeRegionPart(regionName);
writeMessageLength();
}
TcrMessageGetClientPrMetadata::TcrMessageGetClientPrMetadata(
const char* regionName) {
m_msgType = TcrMessage::GET_CLIENT_PR_METADATA;
writeHeader(m_msgType, 1);
writeRegionPart(regionName);
writeMessageLength();
}
TcrMessageSize::TcrMessageSize(const char* regionName) {
m_msgType = TcrMessage::SIZE;
writeHeader(m_msgType, 1);
writeRegionPart(regionName);
writeMessageLength();
}
TcrMessageUserCredential::TcrMessageUserCredential(
PropertiesPtr creds, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::USER_CREDENTIAL_MESSAGE;
m_tcdm = connectionDM;
/*
* First part will be connection-id ( may be in encrypted form) to avoid
* replay attack
* Second part will be credentails (may be in encrypted form)
*/
m_creds = creds;
/*LOGDEBUG("Tcrmessage sending creds to server");
writeHeader(msgType, numOfParts);
writeObjectPart(creds);
writeMessageLength();
LOGDEBUG("TcrMessage addsp = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())->asChar());*/
}
TcrMessageRemoveUserAuth::TcrMessageRemoveUserAuth(
bool keepAlive, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::REMOVE_USER_AUTH;
m_tcdm = connectionDM;
LOGDEBUG("Tcrmessage sending REMOVE_USER_AUTH message to server");
writeHeader(m_msgType, 1);
// adding dummy part as server has check for numberofparts > 0
uint8_t dummy = 0;
if (keepAlive) dummy = 1;
CacheableBytesPtr cbp = CacheableBytes::create(&dummy, 1);
writeObjectPart(cbp, false);
writeMessageLength();
LOGDEBUG("TcrMessage REMOVE_USER_AUTH = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
void TcrMessage::createUserCredentialMessage(TcrConnection* conn) {
m_request->reset();
m_isSecurityHeaderAdded = false;
writeHeader(m_msgType, 1);
DataOutput dOut;
if (m_creds != NULLPTR) m_creds->toData(dOut);
CacheableBytesPtr credBytes =
CacheableBytes::create(dOut.getBuffer(), dOut.getBufferLength());
CacheableBytesPtr encryptBytes = conn->encryptBytes(credBytes);
writeObjectPart(encryptBytes);
writeMessageLength();
LOGDEBUG("TcrMessage CUCM() = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
void TcrMessage::addSecurityPart(int64_t connectionId, int64_t unique_id,
TcrConnection* conn) {
LOGDEBUG("TcrMessage::addSecurityPart m_isSecurityHeaderAdded = %d ",
m_isSecurityHeaderAdded);
if (m_isSecurityHeaderAdded) {
m_request->rewindCursor(m_securityHeaderLength);
writeMessageLength();
m_securityHeaderLength = 0;
m_isSecurityHeaderAdded = false;
}
m_isSecurityHeaderAdded = true;
LOGDEBUG("addSecurityPart( , ) ");
DataOutput dOutput;
dOutput.writeInt(connectionId);
dOutput.writeInt(unique_id);
CacheableBytesPtr bytes =
CacheableBytes::create(dOutput.getBuffer(), dOutput.getBufferLength());
CacheableBytesPtr encryptBytes = conn->encryptBytes(bytes);
writeObjectPart(encryptBytes);
writeMessageLength();
m_securityHeaderLength = 4 + 1 + encryptBytes->length();
LOGDEBUG("TcrMessage addsp = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
void TcrMessage::addSecurityPart(int64_t connectionId, TcrConnection* conn) {
LOGDEBUG("TcrMessage::addSecurityPart m_isSecurityHeaderAdded = %d ",
m_isSecurityHeaderAdded);
if (m_isSecurityHeaderAdded) {
m_request->rewindCursor(m_securityHeaderLength);
writeMessageLength();
m_securityHeaderLength = 0;
m_isSecurityHeaderAdded = false;
}
m_isSecurityHeaderAdded = true;
LOGDEBUG("TcrMessage::addSecurityPart only connid");
DataOutput dOutput;
dOutput.writeInt(connectionId);
CacheableBytesPtr bytes =
CacheableBytes::create(dOutput.getBuffer(), dOutput.getBufferLength());
CacheableBytesPtr encryptBytes = conn->encryptBytes(bytes);
writeObjectPart(encryptBytes);
writeMessageLength();
m_securityHeaderLength = 4 + 1 + encryptBytes->length();
LOGDEBUG("TcrMessage addspCC = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
TcrMessageRequestEventValue::TcrMessageRequestEventValue(EventIdPtr eventId) {
m_msgType = TcrMessage::REQUEST_EVENT_VALUE;
uint32_t numOfParts = 1;
writeHeader(m_msgType, numOfParts);
writeObjectPart(eventId);
writeMessageLength();
}
TcrMessageGetPdxIdForType::TcrMessageGetPdxIdForType(
const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM,
int32_t pdxTypeId) {
m_msgType = TcrMessage::GET_PDX_ID_FOR_TYPE;
m_tcdm = connectionDM;
LOGDEBUG("Tcrmessage sending GET_PDX_ID_FOR_TYPE message to server");
writeHeader(m_msgType, 1);
writeObjectPart(pdxType, false, true);
writeMessageLength();
LOGDEBUG("TcrMessage GET_PDX_ID_FOR_TYPE = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
TcrMessageAddPdxType::TcrMessageAddPdxType(const CacheablePtr& pdxType,
ThinClientBaseDM* connectionDM,
int32_t pdxTypeId) {
m_msgType = TcrMessage::ADD_PDX_TYPE;
m_tcdm = connectionDM;
LOGDEBUG("Tcrmessage sending ADD_PDX_TYPE message to server");
writeHeader(m_msgType, 2);
writeObjectPart(pdxType, false, true);
writeIntPart(pdxTypeId);
writeMessageLength();
LOGDEBUG("TcrMessage ADD_PDX_TYPE id = %d = %s ", pdxTypeId,
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
TcrMessageGetPdxIdForEnum::TcrMessageGetPdxIdForEnum(
const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM,
int32_t pdxTypeId) {
m_msgType = TcrMessage::GET_PDX_ID_FOR_ENUM;
m_tcdm = connectionDM;
LOGDEBUG("Tcrmessage sending GET_PDX_ID_FOR_ENUM message to server");
writeHeader(m_msgType, 1);
writeObjectPart(pdxType, false, false);
writeMessageLength();
LOGDEBUG("TcrMessage GET_PDX_ID_FOR_ENUM = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
TcrMessageAddPdxEnum::TcrMessageAddPdxEnum(const CacheablePtr& pdxType,
ThinClientBaseDM* connectionDM,
int32_t pdxTypeId) {
m_msgType = TcrMessage::ADD_PDX_ENUM;
m_tcdm = connectionDM;
LOGDEBUG("Tcrmessage sending ADD_PDX_ENUM message to server");
writeHeader(m_msgType, 2);
writeObjectPart(pdxType, false, false);
writeIntPart(pdxTypeId);
writeMessageLength();
LOGDEBUG("TcrMessage ADD_PDX_ENUM id = %d = %s ", pdxTypeId,
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
TcrMessageGetPdxTypeById::TcrMessageGetPdxTypeById(
int32_t typeId, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::GET_PDX_TYPE_BY_ID;
m_tcdm = connectionDM;
LOGDEBUG("Tcrmessage sending GET_PDX_TYPE_BY_ID message to server");
writeHeader(m_msgType, 1);
m_request->writeInt(4);
m_request->writeBoolean(false);
m_request->writeInt(typeId);
writeMessageLength();
LOGDEBUG("TcrMessage GET_PDX_TYPE_BY_ID = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
TcrMessageGetPdxEnumById::TcrMessageGetPdxEnumById(
int32_t typeId, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::GET_PDX_ENUM_BY_ID;
m_tcdm = connectionDM;
LOGDEBUG("Tcrmessage sending GET_PDX_ENUM_BY_ID message to server");
writeHeader(m_msgType, 1);
m_request->writeInt(4);
m_request->writeBoolean(false);
m_request->writeInt(typeId);
writeMessageLength();
LOGDEBUG("TcrMessage GET_PDX_ENUM_BY_ID = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
->asChar());
}
TcrMessageGetFunctionAttributes::TcrMessageGetFunctionAttributes(
const std::string& funcName, ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::GET_FUNCTION_ATTRIBUTES;
m_tcdm = connectionDM;
uint32_t numOfParts = 1;
writeHeader(m_msgType, numOfParts);
writeRegionPart(funcName); // function name string
writeMessageLength();
}
TcrMessageKeySet::TcrMessageKeySet(const std::string& funcName,
ThinClientBaseDM* connectionDM) {
m_msgType = TcrMessage::KEY_SET;
m_tcdm = connectionDM;
uint32_t numOfParts = 1;
writeHeader(m_msgType, numOfParts);
writeRegionPart(funcName); // function name string
writeMessageLength();
}
void TcrMessage::setData(const char* bytearray, int32_t len, uint16_t memId) {
if (bytearray) {
DeleteArray<const char> delByteArr(bytearray);
handleByteArrayResponse(bytearray, len, memId);
}
}
TcrMessage::~TcrMessage() {
GF_SAFE_DELETE(m_request);
GF_SAFE_DELETE(m_cqs);
GF_SAFE_DELETE(m_delta);
/* adongre
* CID 29167: Non-array delete for scalars (DELETE_ARRAY)
* Coverity - II
*/
// GF_SAFE_DELETE( m_deltaBytes );
GF_SAFE_DELETE_ARRAY(m_deltaBytes);
}
const std::string& TcrMessage::getRegionName() const { return m_regionName; }
Region* TcrMessage::getRegion() const { return const_cast<Region*>(m_region); }
int32_t TcrMessage::getMessageType() const { return m_msgType; }
void TcrMessage::setMessageType(int32_t msgType) { m_msgType = msgType; }
void TcrMessage::setMessageTypeRequest(int32_t msgType) {
m_msgTypeRequest = msgType;
}
int32_t TcrMessage::getMessageTypeRequest() const { return m_msgTypeRequest; }
const std::map<std::string, int>* TcrMessage::getCqs() const { return m_cqs; }
CacheableKeyPtr TcrMessage::getKey() const { return m_key; }
const CacheableKeyPtr& TcrMessage::getKeyRef() const { return m_key; }
CacheablePtr TcrMessage::getValue() const { return m_value; }
const CacheablePtr& TcrMessage::getValueRef() const { return m_value; }
CacheablePtr TcrMessage::getCallbackArgument() const {
return m_callbackArgument;
}
const CacheablePtr& TcrMessage::getCallbackArgumentRef() const {
return m_callbackArgument;
}
const char* TcrMessage::getMsgData() const {
return (char*)m_request->getBuffer();
}
const char* TcrMessage::getMsgHeader() const {
return (char*)m_request->getBuffer();
}
const char* TcrMessage::getMsgBody() const {
return (char*)m_request->getBuffer() + g_headerLen;
}
uint32_t TcrMessage::getMsgLength() const {
return m_request->getBufferLength();
}
uint32_t TcrMessage::getMsgBodyLength() const {
return m_request->getBufferLength() - g_headerLen;
}
EventIdPtr TcrMessage::getEventId() const { return m_eventid; }
int32_t TcrMessage::getTransId() const { return m_txId; }
void TcrMessage::setTransId(int32_t txId) { m_txId = txId; }
uint32_t TcrMessage::getTimeout() const { return m_timeout; }
void TcrMessage::setTimeout(uint32_t timeout) { m_timeout = timeout; }
void TcrMessage::skipParts(DataInput& input, int32_t numParts) {
while (numParts > 0) {
numParts--;
int32_t partLen;
input.readInt(&partLen);
LOGDEBUG("TcrMessage::skipParts partLen= %d ", partLen);
input.advanceCursor(partLen + 1); // Skip the whole part including "isObj"
}
}
void TcrMessage::readEventIdPart(DataInput& input, bool skip, int32_t parts) {
// skip requested number of parts
if (skip) {
skipParts(input, parts);
}
// read the eventid part
int32_t eventIdLen;
int8_t isObj;
input.readInt(&eventIdLen);
input.read(&isObj);
GF_D_ASSERT(isObj != 0);
input.readObject(m_eventid);
}
DSMemberForVersionStampPtr TcrMessage::readDSMember(
apache::geode::client::DataInput& input) {
uint8_t typeidLen;
input.read(&typeidLen);
if (typeidLen == 1) {
uint8_t typeidofMember;
input.read(&typeidofMember);
if (typeidofMember != GeodeTypeIdsImpl::InternalDistributedMember) {
throw Exception(
"Reading DSMember. Expecting type id 92 for "
"InternalDistributedMember. ");
}
ClientProxyMembershipIDPtr memId =
ClientProxyMembershipIDPtr(new ClientProxyMembershipID());
memId->fromData(input);
return (DSMemberForVersionStampPtr)memId;
} else if (typeidLen == 2) {
uint16_t typeidofMember;
input.readInt(&typeidofMember);
if (typeidofMember != GeodeTypeIdsImpl::DiskStoreId) {
throw Exception(
"Reading DSMember. Expecting type id 2133 for DiskStoreId. ");
}
DiskStoreId* diskStore = new DiskStoreId();
diskStore->fromData(input);
return DSMemberForVersionStampPtr(diskStore);
} else {
throw Exception(
"Reading DSMember. Expecting type id length as either one or two "
"byte.");
}
}
void TcrMessage::readHashMapForGCVersions(
apache::geode::client::DataInput& input, CacheableHashMapPtr& value) {
uint8_t hashmaptypeid;
input.read(&hashmaptypeid);
if (hashmaptypeid != GeodeTypeIds::CacheableHashMap) {
throw Exception(
"Reading HashMap For GC versions. Expecting type id of hash map. ");
}
int32_t len;
input.readArrayLen(&len);
if (len > 0) {
CacheableKeyPtr key;
CacheablePtr val;
for (int32_t index = 0; index < len; index++) {
key = readDSMember(input);
uint8_t versiontype;
int64_t version;
input.read(&versiontype);
input.readInt(&version);
CacheablePtr valVersion = CacheableInt64::create(version);
CacheableKeyPtr keyPtr = dynCast<CacheableKeyPtr>(key);
CacheablePtr valVersionPtr = dynCast<CacheablePtr>(valVersion);
if (value != NULLPTR) {
value->insert(keyPtr, valVersionPtr);
} else {
throw Exception(
"Inserting values in HashMap For GC versions. value must not be "
"NULLPTR. ");
}
}
}
}
void TcrMessage::readHashSetForGCVersions(
apache::geode::client::DataInput& input, CacheableHashSetPtr& value) {
uint8_t hashsettypeid;
input.read(&hashsettypeid);
if (hashsettypeid != GeodeTypeIds::CacheableHashSet) {
throw Exception(
"Reading HashSet For GC versions. Expecting type id of hash set. ");
}
int32_t len;
input.readArrayLen(&len);
if (len > 0) {
CacheableKeyPtr key;
CacheablePtr val;
for (int32_t index = 0; index < len; index++) {
CacheableKeyPtr keyPtr;
input.readObject(keyPtr);
value->insert(keyPtr);
}
}
}