| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "TcrMessage.hpp" |
| |
| #include <sstream> |
| |
| #include <geode/CacheableBuiltins.hpp> |
| #include <geode/CacheableObjectArray.hpp> |
| #include <geode/SystemProperties.hpp> |
| |
| #include "AutoDelete.hpp" |
| #include "CacheRegionHelper.hpp" |
| #include "DataInputInternal.hpp" |
| #include "DataOutputInternal.hpp" |
| #include "DiskStoreId.hpp" |
| #include "DiskVersionTag.hpp" |
| #include "DistributedSystem.hpp" |
| #include "StackTrace.hpp" |
| #include "TSSTXStateWrapper.hpp" |
| #include "TXState.hpp" |
| #include "TcrChunkedContext.hpp" |
| #include "TcrConnection.hpp" |
| #include "TcrConnectionManager.hpp" |
| #include "ThinClientBaseDM.hpp" |
| #include "ThinClientPoolDM.hpp" |
| #include "ThinClientRegion.hpp" |
| #include "util/JavaModifiedUtf8.hpp" |
| |
| #pragma error_messages(off, SEC_UNINITIALIZED_MEM_READ) |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| namespace { |
| const uint32_t g_headerLen = 17; |
| const uint32_t REGULAR_EXPRESSION = |
| 1; // come from Java InterestType.REGULAR_EXPRESSION |
| |
| inline void readInt(uint8_t* buffer, uint16_t* value) { |
| uint16_t tmp = *(buffer++); |
| tmp = (tmp << 8) | *(buffer); |
| *value = tmp; |
| } |
| |
| inline void readInt(uint8_t* buffer, uint32_t* value) { |
| uint32_t tmp = *(buffer++); |
| tmp = (tmp << 8) | *(buffer++); |
| tmp = (tmp << 8) | *(buffer++); |
| tmp = (tmp << 8) | *(buffer++); |
| *value = tmp; |
| } |
| |
| inline void writeInt(uint8_t* buffer, uint16_t value) { |
| *(buffer++) = static_cast<uint8_t>(value >> 8); |
| *(buffer++) = static_cast<uint8_t>(value); |
| } |
| |
| inline void writeInt(uint8_t* buffer, uint32_t value) { |
| *(buffer++) = static_cast<uint8_t>(value >> 24); |
| *(buffer++) = static_cast<uint8_t>(value >> 16); |
| *(buffer++) = static_cast<uint8_t>(value >> 8); |
| *(buffer++) = static_cast<uint8_t>(value); |
| } |
| } // namespace |
| |
| extern void setThreadLocalExceptionMessage(const char*); |
| |
| // AtomicInc TcrMessage::m_transactionId = 0; |
| uint8_t* TcrMessage::m_keepAlive = nullptr; |
| const int TcrMessage::m_flag_empty = 0x01; |
| const int TcrMessage::m_flag_concurrency_checks = 0x02; |
| |
| bool TcrMessage::isKeepAlive() { return (m_keepAlive && (*m_keepAlive > 0)); } |
| |
| bool TcrMessage::isUserInitiativeOps(const TcrMessage& msg) { |
| int32_t msgType = msg.getMessageType(); |
| |
| if (!msg.isMetaRegion() && |
| !(msgType == TcrMessage::PING || msgType == TcrMessage::PERIODIC_ACK || |
| msgType == TcrMessage::MAKE_PRIMARY || |
| msgType == TcrMessage::CLOSE_CONNECTION || |
| msgType == TcrMessage::CLIENT_READY || msgType == TcrMessage::INVALID || |
| msgType == TcrMessage::MONITORCQ_MSG_TYPE || |
| msgType == TcrMessage::GETCQSTATS_MSG_TYPE || |
| msgType == TcrMessage::REQUEST_EVENT_VALUE || |
| msgType == TcrMessage::GET_CLIENT_PR_METADATA || |
| msgType == TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES || |
| msgType == TcrMessage::GET_PDX_ID_FOR_TYPE || |
| msgType == TcrMessage::GET_PDX_TYPE_BY_ID || |
| msgType == TcrMessage::ADD_PDX_TYPE || msgType == TcrMessage::SIZE || |
| msgType == TcrMessage::TX_FAILOVER || |
| msgType == TcrMessage::GET_ENTRY || |
| msgType == TcrMessage::TX_SYNCHRONIZATION || |
| msgType == TcrMessage::GET_FUNCTION_ATTRIBUTES || |
| msgType == TcrMessage::ADD_PDX_ENUM || |
| msgType == TcrMessage::GET_PDX_ENUM_BY_ID || |
| msgType == TcrMessage::GET_PDX_ID_FOR_ENUM || |
| msgType == TcrMessage::COMMIT || msgType == TcrMessage::ROLLBACK)) { |
| return true; |
| } |
| return false; |
| } |
| |
| TcrMessage::TcrMessage() |
| : m_request(nullptr), |
| m_tcdm(nullptr), |
| m_chunkedResult(nullptr), |
| m_keyList(nullptr), |
| m_region(nullptr), |
| m_timeout(15 /*DEFAULT_TIMEOUT_SECONDS*/), |
| m_metadata(), |
| m_cqs(nullptr), |
| m_messageResponseTimeout(-1), |
| m_delta(nullptr), |
| m_deltaBytes(nullptr), |
| m_fpaSet(), |
| m_functionAttributes(), |
| m_connectionIDBytes(nullptr), |
| m_creds(), |
| m_key(), |
| m_value(nullptr), |
| m_failedNode(), |
| m_callbackArgument(nullptr), |
| m_versionTag(), |
| m_eventid(nullptr), |
| m_vectorPtr(), |
| m_bucketServerLocation(nullptr), |
| m_tombstoneVersions(), |
| m_tombstoneKeys(), |
| m_versionObjPartListptr(), |
| exceptionMessage(), |
| m_regionName("INVALID_REGION_NAME"), |
| m_regex(), |
| m_bucketServerLocations(), |
| m_colocatedWith(), |
| m_partitionResolverName(), |
| m_securityHeaderLength(0), |
| m_msgType(TcrMessage::INVALID), |
| m_msgLength(-1), |
| m_msgTypeRequest(0), |
| m_txId(-1), |
| m_bucketCount(0), |
| m_numCqPart(0), |
| m_msgTypeForCq(0), |
| m_deltaBytesLen(0), |
| m_entryNotFound(0), |
| m_feAnotherHop(false), |
| isSecurityOn(false), |
| m_isLastChunkAndisSecurityHeader(0), |
| m_isSecurityHeaderAdded(false), |
| m_isMetaRegion(false), |
| m_decodeAll(false), |
| m_interestPolicy(0), |
| m_isDurable(false), |
| m_receiveValues(false), |
| m_hasCqsPart(false), |
| m_isInterestListPassed(false), |
| m_shouldIgnore(false), |
| m_metaDataVersion(0), |
| m_serverGroupVersion(0), |
| m_boolValue(0), |
| m_isCallBackArguement(false), |
| m_hasResult(0) {} |
| |
| const std::vector<std::shared_ptr<CacheableKey>>* TcrMessage::getKeys() const { |
| return m_keyList; |
| } |
| |
| const std::string& TcrMessage::getRegex() const { return m_regex; } |
| |
| InterestResultPolicy TcrMessage::getInterestResultPolicy() const { |
| if (m_interestPolicy == 2) { |
| return InterestResultPolicy::KEYS_VALUES; |
| } else if (m_interestPolicy == 1) { |
| return InterestResultPolicy::KEYS; |
| } else { |
| return InterestResultPolicy::NONE; |
| } |
| } |
| |
| bool TcrMessage::forPrimary() const { |
| return m_msgType == TcrMessage::PUT || m_msgType == TcrMessage::DESTROY || |
| m_msgType == TcrMessage::EXECUTE_REGION_FUNCTION; |
| } |
| |
| void TcrMessage::initCqMap() { m_cqs = new std::map<std::string, int>(); } |
| |
| bool TcrMessage::forSingleHop() const { |
| return m_msgType == TcrMessage::PUT || m_msgType == TcrMessage::DESTROY || |
| m_msgType == TcrMessage::REQUEST || |
| m_msgType == TcrMessage::GET_ALL_70 || |
| m_msgType == TcrMessage::GET_ALL_WITH_CALLBACK || |
| m_msgType == TcrMessage::EXECUTE_REGION_FUNCTION || |
| m_msgType == TcrMessage::PUTALL || |
| m_msgType == TcrMessage::PUT_ALL_WITH_CALLBACK; |
| } |
| |
| bool TcrMessage::forTransaction() const { return m_txId != -1; } |
| |
| bool TcrMessage::getBoolValue() const { return m_boolValue; } |
| |
| const char* TcrMessage::getException() { |
| exceptionMessage = Utils::nullSafeToString(m_value); |
| return exceptionMessage.c_str(); |
| } |
| |
| bool TcrMessage::isDurable() const { return m_isDurable; } |
| |
| bool TcrMessage::receiveValues() const { return m_receiveValues; } |
| |
| bool TcrMessage::hasCqPart() const { return m_hasCqsPart; } |
| |
| uint32_t TcrMessage::getMessageTypeForCq() const { return m_msgTypeForCq; } |
| |
| bool TcrMessage::isInterestListPassed() const { return m_isInterestListPassed; } |
| |
| bool TcrMessage::shouldIgnore() const { return m_shouldIgnore; } |
| |
| int8_t TcrMessage::getMetaDataVersion() const { return m_metaDataVersion; } |
| |
| uint32_t TcrMessage::getEntryNotFound() const { return m_entryNotFound; } |
| |
| int8_t TcrMessage::getserverGroupVersion() const { |
| return m_serverGroupVersion; |
| } |
| |
| std::vector<int8_t>* TcrMessage::getFunctionAttributes() { |
| return m_functionAttributes; |
| } |
| |
| void TcrMessage::setDM(ThinClientBaseDM* dm) { m_tcdm = dm; } |
| |
| ThinClientBaseDM* TcrMessage::getDM() { return m_tcdm; } |
| |
| // set the chunked response handler |
| void TcrMessage::setChunkedResultHandler(TcrChunkedResult* chunkedResult) { |
| m_isLastChunkAndisSecurityHeader = 0x0; |
| m_chunkedResult = chunkedResult; |
| } |
| |
| TcrChunkedResult* TcrMessage::getChunkedResultHandler() { |
| return m_chunkedResult; |
| } |
| |
| void TcrMessage::setVersionedObjectPartList( |
| std::shared_ptr<VersionedCacheableObjectPartList> versionObjPartListptr) { |
| m_versionObjPartListptr = versionObjPartListptr; |
| } |
| |
| std::shared_ptr<VersionedCacheableObjectPartList> |
| TcrMessage::getVersionedObjectPartList() { |
| return m_versionObjPartListptr; |
| } |
| |
| DataInput* TcrMessage::getDelta() { return m_delta.get(); } |
| |
| // getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes |
| // that |
| // takes ownership of delta bytes. |
| std::shared_ptr<CacheableBytes> TcrMessage::getDeltaBytes() { |
| if (m_deltaBytes == nullptr) { |
| return nullptr; |
| } |
| auto retVal = CacheableBytes::create( |
| std::vector<int8_t>(m_deltaBytes, m_deltaBytes + m_deltaBytesLen)); |
| m_deltaBytes = nullptr; |
| return retVal; |
| } |
| |
| bool TcrMessage::hasDelta() { return (m_delta != nullptr); } |
| |
| void TcrMessage::setMetaRegion(bool isMetaRegion) { |
| m_isMetaRegion = isMetaRegion; |
| } |
| |
| bool TcrMessage::isMetaRegion() const { return m_isMetaRegion; } |
| |
| int32_t TcrMessage::getNumBuckets() const { return m_bucketCount; } |
| |
| const std::string& TcrMessage::getColocatedWith() const { |
| return m_colocatedWith; |
| } |
| |
| const std::string& TcrMessage::getPartitionResolver() const { |
| return m_partitionResolverName; |
| } |
| |
| std::vector<std::vector<std::shared_ptr<BucketServerLocation>>>* |
| TcrMessage::getMetadata() { |
| return m_metadata; |
| } |
| |
| std::vector<std::shared_ptr<FixedPartitionAttributesImpl>>* |
| TcrMessage::getFpaSet() { |
| return m_fpaSet; |
| } |
| |
| std::shared_ptr<CacheableHashSet> TcrMessage::getFailedNode() { |
| return m_failedNode; |
| } |
| |
| bool TcrMessage::isCallBackArguement() const { return m_isCallBackArguement; } |
| |
| void TcrMessage::setCallBackArguement(bool aCallBackArguement) { |
| m_isCallBackArguement = aCallBackArguement; |
| } |
| |
| void TcrMessage::setBucketServerLocation( |
| std::shared_ptr<BucketServerLocation> serverLocation) { |
| m_bucketServerLocation = serverLocation; |
| } |
| void TcrMessage::setVersionTag(std::shared_ptr<VersionTag> versionTag) { |
| m_versionTag = versionTag; |
| } |
| std::shared_ptr<VersionTag> TcrMessage::getVersionTag() { return m_versionTag; } |
| |
| uint8_t TcrMessage::hasResult() const { return m_hasResult; } |
| |
| std::shared_ptr<CacheableHashMap> TcrMessage::getTombstoneVersions() const { |
| return m_tombstoneVersions; |
| } |
| |
| std::shared_ptr<CacheableHashSet> TcrMessage::getTombstoneKeys() const { |
| return m_tombstoneKeys; |
| } |
| |
| TcrMessagePing* TcrMessage::getPingMessage(CacheImpl* cacheImpl) { |
| static auto pingMsg = |
| new TcrMessagePing(new DataOutput(cacheImpl->createDataOutput()), true); |
| return pingMsg; |
| } |
| |
| TcrMessage* TcrMessage::getAllEPDisMess() { |
| static auto allEPDisconnected = new TcrMessageReply(true, nullptr); |
| return allEPDisconnected; |
| } |
| |
| TcrMessage* TcrMessage::getCloseConnMessage(CacheImpl* cacheImpl) { |
| static auto closeConnMsg = new TcrMessageCloseConnection( |
| new DataOutput(cacheImpl->createDataOutput()), true); |
| return closeConnMsg; |
| } |
| |
| void TcrMessage::setKeepAlive(bool keepalive) { |
| // TODO global |
| if (TcrMessage::m_keepAlive != nullptr) { |
| *TcrMessage::m_keepAlive = keepalive ? 1 : 0; |
| } |
| } |
| |
| void TcrMessage::writeInterestResultPolicyPart(InterestResultPolicy policy) { |
| m_request->writeInt(static_cast<int32_t>(3)); // size |
| m_request->write(static_cast<int8_t>(1)); // isObject |
| m_request->write(static_cast<int8_t>(DSCode::FixedIDByte)); |
| m_request->write(static_cast<int8_t>(DSCode::InterestResultPolicy)); |
| m_request->write(static_cast<int8_t>(policy.getOrdinal())); |
| } |
| |
| void TcrMessage::writeIntPart(int32_t intValue) { |
| m_request->writeInt(static_cast<int32_t>(4)); |
| m_request->write(static_cast<int8_t>(0)); |
| m_request->writeInt(intValue); |
| } |
| |
| void TcrMessage::writeBytePart(uint8_t byteValue) { |
| m_request->writeInt(static_cast<int32_t>(1)); |
| m_request->write(static_cast<int8_t>(0)); |
| m_request->write(byteValue); |
| } |
| |
| void TcrMessage::writeByteAndTimeOutPart(uint8_t byteValue, |
| std::chrono::milliseconds timeout) { |
| m_request->writeInt(static_cast<int32_t>(5)); // 1 (byte) + 4 (timeout) |
| m_request->write(static_cast<int8_t>(0)); |
| m_request->write(byteValue); |
| m_request->writeInt(static_cast<int32_t>(timeout.count())); |
| } |
| |
| void TcrMessage::writeMillisecondsPart(std::chrono::milliseconds millis) { |
| writeIntPart(static_cast<int32_t>(millis.count())); |
| } |
| |
| void TcrMessage::readBooleanPartAsObject(DataInput& input, bool* boolVal) { |
| int32_t lenObj = input.readInt32(); |
| const auto isObj = input.readBoolean(); |
| if (lenObj > 0) { |
| if (isObj) { |
| bool bVal = input.readNativeBool(); |
| *boolVal = bVal; |
| } |
| } |
| } |
| |
| void TcrMessage::readOldValue(DataInput& input) { |
| // read and ignore length |
| input.readInt32(); |
| input.read(); // ignore isObj |
| std::shared_ptr<Cacheable> value; |
| input.readObject(value); // we are not using this value currently |
| } |
| |
| void TcrMessage::readPrMetaData(DataInput& input) { |
| int32_t lenObj = input.readInt32(); |
| input.read(); // ignore |
| m_metaDataVersion = input.read(); // read refresh meta data byte |
| if (lenObj == 2) { |
| m_serverGroupVersion = input.read(); |
| LOGDEBUG("Single-hop m_serverGroupVersion in message reply is %d", |
| m_serverGroupVersion); |
| } |
| } |
| std::shared_ptr<VersionTag> TcrMessage::readVersionTagPart( |
| DataInput& input, uint16_t endpointMemId, |
| MemberListForVersionStamp& memberListForVersionStamp) { |
| auto isObj = static_cast<DSCode>(input.read()); |
| std::shared_ptr<VersionTag> versionTag; |
| |
| if (isObj == DSCode::NullObj) return versionTag; |
| |
| if (isObj == DSCode::FixedIDByte) { |
| versionTag = std::make_shared<VersionTag>(memberListForVersionStamp); |
| if (static_cast<DSFid>(input.read()) == DSFid::VersionTag) { |
| versionTag->fromData(input); |
| versionTag->replaceNullMemberId(endpointMemId); |
| return versionTag; |
| } |
| } else if (isObj == DSCode::FixedIDShort) { |
| if (input.readInt16() == static_cast<int16_t>(DSFid::DiskVersionTag)) { |
| DiskVersionTag* disk = new DiskVersionTag(memberListForVersionStamp); |
| disk->fromData(input); |
| versionTag.reset(disk); |
| return versionTag; |
| } |
| } |
| return versionTag; |
| } |
| |
| void TcrMessage::readVersionTag( |
| DataInput& input, uint16_t endpointMemId, |
| MemberListForVersionStamp& memberListForVersionStamp) { |
| int32_t lenObj = input.readInt32(); |
| input.read(); // ignore byte |
| |
| if (lenObj == 0) return; |
| auto versionTag = TcrMessage::readVersionTagPart(input, endpointMemId, |
| memberListForVersionStamp); |
| this->setVersionTag(versionTag); |
| } |
| |
| void TcrMessage::readIntPart(DataInput& input, uint32_t* intValue) { |
| uint32_t intLen = input.readInt32(); |
| if (intLen != 4) { |
| throw Exception("int length should have been 4"); |
| } |
| if (input.read()) throw Exception("Integer is not an object"); |
| *intValue = input.readInt32(); |
| } |
| |
| void TcrMessage::readLongPart(DataInput& input, uint64_t* intValue) { |
| uint32_t longLen = input.readInt32(); |
| if (longLen != 8) throw Exception("long length should have been 8"); |
| if (input.read()) throw Exception("Long is not an object"); |
| *intValue = input.readInt64(); |
| } |
| |
| const std::string TcrMessage::readStringPart(DataInput& input) { |
| char* stringBuffer; |
| int32_t stringLength = input.readInt32(); |
| stringBuffer = new char[stringLength + 1]; |
| stringBuffer[stringLength] = '\0'; |
| if (input.read()) { |
| throw Exception("String is not an object"); |
| } |
| input.readBytesOnly(reinterpret_cast<int8_t*>(stringBuffer), stringLength); |
| std::string str = stringBuffer; |
| delete[] stringBuffer; |
| return str; |
| } |
| |
| void TcrMessage::readCqsPart(DataInput& input) { |
| m_cqs->clear(); |
| readIntPart(input, &m_numCqPart); |
| for (uint32_t cqCnt = 0; cqCnt < m_numCqPart;) { |
| auto cq = readStringPart(input); |
| cqCnt++; |
| int32_t cqOp; |
| readIntPart(input, reinterpret_cast<uint32_t*>(&cqOp)); |
| cqCnt++; |
| (*m_cqs)[cq] = cqOp; |
| } |
| } |
| |
| void TcrMessage::readCallbackObjectPart(DataInput& input, bool defaultString) { |
| int32_t lenObj = input.readInt32(); |
| const auto isObj = input.readBoolean(); |
| if (lenObj > 0) { |
| if (isObj) { |
| input.readObject(m_callbackArgument); |
| } else { |
| if (defaultString) { |
| m_callbackArgument = readCacheableString(input, lenObj); |
| } else { |
| m_callbackArgument = readCacheableBytes(input, lenObj); |
| } |
| } |
| } |
| } |
| |
| void TcrMessage::readObjectPart(DataInput& input, bool defaultString) { |
| int32_t lenObj = input.readInt32(); |
| auto isObj = input.read(); |
| if (lenObj > 0) { |
| if (isObj == 1) { |
| input.readObject(m_value); |
| } else { |
| if (defaultString) { |
| m_value = readCacheableString(input, lenObj); |
| } else { |
| m_value = readCacheableBytes(input, lenObj); |
| } |
| } |
| } else if (lenObj == 0 && isObj == 2) { // EMPTY BYTE ARRAY |
| m_value = CacheableBytes::create(); |
| } else if (isObj == 0) { |
| m_value = nullptr; |
| } |
| } |
| |
| void TcrMessage::readSecureObjectPart(DataInput& input, bool defaultString, |
| bool isChunk, |
| uint8_t isLastChunkWithSecurity) { |
| LOGDEBUG( |
| "TcrMessage::readSecureObjectPart isChunk = %d isLastChunkWithSecurity = " |
| "%d", |
| isChunk, isLastChunkWithSecurity); |
| if (isChunk) { |
| if (!(isLastChunkWithSecurity & 0x2)) { |
| return; |
| } |
| } |
| |
| int32_t lenObj = input.readInt32(); |
| const auto isObj = input.readBoolean(); |
| LOGDEBUG( |
| "TcrMessage::readSecureObjectPart lenObj = %d isObj = %d, " |
| "m_msgTypeRequest = %d defaultString = %d ", |
| lenObj, isObj, m_msgTypeRequest, defaultString); |
| if (lenObj > 0) { |
| if (isObj) { |
| // TODO: ?? |
| input.readObject(m_value); |
| } else { |
| if (defaultString) { |
| // TODO: ?? |
| // m_value = CacheableString::create( |
| // (char*)input.currentBufferPosition( ), lenObj ); |
| m_value = readCacheableString(input, lenObj); |
| } else { |
| LOGDEBUG("reading connectionid"); |
| // TODO: this will execute always |
| // input.rea.readInt(&connectionId); |
| // m_connectionIDBytes = |
| // CacheableBytes::create(input.currentBufferPosition(), lenObj); |
| // m_connectionIDBytes = readCacheableBytes(input, lenObj); |
| m_connectionIDBytes = CacheableBytes::create( |
| std::vector<int8_t>(input.currentBufferPosition(), |
| input.currentBufferPosition() + lenObj)); |
| input.advanceCursor(lenObj); |
| } |
| } |
| } |
| if (input.getBytesRemaining() != 0) { |
| LOGERROR("readSecureObjectPart: we not read all bytes. Messagetype:%d", |
| m_msgType); |
| throw IllegalStateException("didn't read all bytes"); |
| } |
| } |
| |
| void TcrMessage::readUniqueIDObjectPart(DataInput& input) { |
| LOGDEBUG("TcrMessage::readUniqueIDObjectPart"); |
| |
| int32_t lenObj = input.readInt32(); |
| const auto isObj = input.readBoolean(); |
| LOGDEBUG("TcrMessage::readUniqueIDObjectPart lenObj = %d isObj = %d", lenObj, |
| isObj); |
| if (lenObj > 0) { |
| m_value = CacheableBytes::create(std::vector<int8_t>( |
| input.currentBufferPosition(), input.currentBufferPosition() + lenObj)); |
| input.advanceCursor(lenObj); |
| } |
| } |
| |
| int64_t TcrMessage::getConnectionId(TcrConnection* conn) { |
| if (m_connectionIDBytes != nullptr) { |
| auto tmp = conn->decryptBytes(m_connectionIDBytes); |
| auto di = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(tmp->value().data()), tmp->length()); |
| return di.readInt64(); |
| } else { |
| LOGWARN("Returning 0 as internal connection ID msgtype = %d ", m_msgType); |
| return 0; |
| } |
| } |
| |
| int64_t TcrMessage::getUniqueId(TcrConnection* conn) { |
| if (m_value != nullptr) { |
| auto encryptBytes = std::dynamic_pointer_cast<CacheableBytes>(m_value); |
| |
| auto tmp = conn->decryptBytes(encryptBytes); |
| |
| auto di = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(tmp->value().data()), tmp->length()); |
| return di.readInt64(); |
| } |
| return 0; |
| } |
| |
| void TcrMessage::readFailedNodePart(DataInput& input) { |
| // read and ignore length |
| input.readInt32(); |
| // read and ignore isObj |
| input.readBoolean(); |
| m_failedNode = CacheableHashSet::create(); |
| input.read(); // ignore typeId |
| // input.readDirectObject(m_failedNode, typeId); |
| m_failedNode->fromData(input); |
| LOGDEBUG("readFailedNodePart m_failedNode size = %zu", m_failedNode->size()); |
| } |
| |
| void TcrMessage::readKeyPart(DataInput& input) { |
| int32_t lenObj = input.readInt32(); |
| const auto isObj = input.readBoolean(); |
| if (lenObj > 0) { |
| if (isObj) { |
| m_key = std::dynamic_pointer_cast<CacheableKey>(input.readObject()); |
| } else { |
| m_key = std::dynamic_pointer_cast<CacheableKey>( |
| readCacheableString(input, lenObj)); |
| } |
| } |
| } |
| |
| std::shared_ptr<Serializable> TcrMessage::readCacheableString(DataInput& input, |
| int lenObj) { |
| auto decoded = internal::JavaModifiedUtf8::decode( |
| reinterpret_cast<const char*>(input.currentBufferPosition()), lenObj); |
| input.advanceCursor(lenObj); |
| |
| return CacheableString::create(decoded); |
| } |
| |
| std::shared_ptr<Serializable> TcrMessage::readCacheableBytes(DataInput& input, |
| int lenObj) { |
| if (lenObj <= 252) { // 252 is java's ((byte)-4 && 0xFF) |
| input.rewindCursor(1); |
| uint8_t* buffer = const_cast<uint8_t*>(input.currentBufferPosition()); |
| buffer[0] = static_cast<uint8_t>(lenObj); |
| } else if (lenObj <= 0xFFFF) { |
| input.rewindCursor(3); |
| uint8_t* buffer = const_cast<uint8_t*>(input.currentBufferPosition()); |
| buffer[0] = static_cast<uint8_t>(-2); |
| writeInt(buffer + 1, static_cast<uint16_t>(lenObj)); |
| } else { |
| input.rewindCursor(5); |
| uint8_t* buffer = const_cast<uint8_t*>(input.currentBufferPosition()); |
| buffer[0] = static_cast<uint8_t>(-3); |
| writeInt(buffer + 1, static_cast<uint32_t>(lenObj)); |
| } |
| |
| return input.readDirectObject(static_cast<int8_t>(DSCode::CacheableBytes)); |
| } |
| |
| bool TcrMessage::readExceptionPart(DataInput& input, uint8_t isLastChunk, |
| bool skipFirstPart) { |
| // Reading exception message sent from java cache server. |
| // Read the first part which is serialized java exception and ignore it. |
| // Then read the second part which is string and use it to construct the |
| // exception. |
| |
| isLastChunk = (isLastChunk >> 5); |
| LOGDEBUG("TcrMessage::readExceptionPart: isLastChunk = %d", isLastChunk); |
| if (skipFirstPart == true) { |
| skipParts(input, 1); |
| isLastChunk--; |
| } |
| |
| if (/*input.getBytesRemaining() > 0 */ isLastChunk > 0) { |
| // Do a best effort read for exception since it is possible |
| // that this is invoked for non-exception chunk message which has |
| // only one part. |
| isLastChunk--; |
| readObjectPart(input, true); |
| // if (input.getBytesRemaining() > 0 && m_msgTypeRequest == |
| // TcrMessage::EXECUTE_REGION_FUNCTION && m_msgType != |
| // TcrMessage::EXCEPTION) { |
| if (isLastChunk > 0) { |
| readFailedNodePart(input); |
| return true; // 3 parts |
| } else { |
| return true; // 2 parts |
| } |
| } |
| return false; |
| } |
| |
| void TcrMessage::writeObjectPart( |
| const std::shared_ptr<Serializable>& se, bool isDelta, bool callToData, |
| const std::vector<std::shared_ptr<CacheableKey>>* getAllKeyList) { |
| // no nullptr check since for some messages nullptr object may be valid |
| uint32_t size = 0; |
| // write a dummy size of 4 bytes. |
| m_request->writeInt(static_cast<int32_t>(size)); |
| |
| int8_t isObject = 1; |
| |
| // check if the type is a CacheableBytes |
| if (auto cacheableBytes = std::dynamic_pointer_cast<CacheableBytes>(se)) { |
| // for an emty byte array write EMPTY_BYTEARRAY_CODE(2) to is object |
| auto byteArrLength = cacheableBytes->length(); |
| if (byteArrLength == 0) { |
| isObject = 2; |
| m_request->write(isObject); |
| return; |
| } |
| isObject = 0; |
| } |
| |
| if (isDelta) { |
| m_request->write(static_cast<int8_t>(0)); |
| } else { |
| m_request->write(isObject); |
| } |
| |
| auto sizeBeforeWritingObj = m_request->getBufferLength(); |
| if (isDelta) { |
| auto deltaPtr = std::dynamic_pointer_cast<Delta>(se); |
| deltaPtr->toDelta(*m_request); |
| } else if (isObject) { |
| if (callToData) { |
| m_request->getSerializationRegistry().serializeWithoutHeader(se, |
| *m_request); |
| } else { |
| if (getAllKeyList != nullptr) { |
| int8_t typeId = static_cast<int8_t>(DSCode::CacheableObjectArray); |
| m_request->write(typeId); |
| m_request->writeArrayLen(static_cast<int32_t>(getAllKeyList->size())); |
| m_request->write(static_cast<int8_t>(DSCode::Class)); |
| m_request->writeString("java.lang.Object"); |
| for (const auto& key : *getAllKeyList) { |
| m_request->writeObject(key); |
| } |
| } else { |
| m_request->writeObject(se, isDelta); |
| } |
| } |
| } else { |
| // TODO:: |
| // CacheableBytes* rawByteArray = static_cast<CacheableBytes*>(se.get()); |
| // m_request->writeBytesOnly(rawByteArray->value(), rawByteArray->length()); |
| writeBytesOnly(se); |
| } |
| auto sizeAfterWritingObj = m_request->getBufferLength(); |
| auto sizeOfSerializedObj = sizeAfterWritingObj - sizeBeforeWritingObj; |
| m_request->rewindCursor(sizeOfSerializedObj + 1 + 4); // |
| m_request->writeInt(static_cast<int32_t>(sizeOfSerializedObj)); |
| m_request->advanceCursor(sizeOfSerializedObj + 1); |
| } |
| |
| void TcrMessage::writeBytesOnly(const std::shared_ptr<Serializable>& se) { |
| auto cBufferLength = m_request->getBufferLength(); |
| uint8_t* startBytes = nullptr; |
| m_request->writeObject(se); |
| uint8_t* cursor = |
| const_cast<uint8_t*>(m_request->getBuffer()) + cBufferLength; |
| |
| int pos = 1; // one byte for typeid |
| |
| uint8_t code; |
| code = cursor[pos++]; |
| |
| if (code == 0xFF) { |
| m_request->rewindCursor(2); |
| } else { |
| int32_t result = code; |
| if (result > 252) { // 252 is java's ((byte)-4 && 0xFF) |
| if (code == 0xFE) { |
| uint16_t val; |
| readInt(cursor + pos, &val); |
| startBytes = cursor + 4; |
| result = val; |
| m_request->rewindCursor(4); |
| } else if (code == 0xFD) { |
| uint32_t val; |
| readInt(cursor + pos, &val); |
| startBytes = cursor + 6; |
| result = val; |
| m_request->rewindCursor(6); |
| } |
| } else { |
| startBytes = cursor + 2; |
| m_request->rewindCursor(2); |
| } |
| for (int i = 0; i < result; i++) cursor[i] = startBytes[i]; |
| } |
| } |
| |
| void TcrMessage::writeHeader(uint32_t msgType, uint32_t numOfParts) { |
| int8_t earlyAck = 0x0; |
| LOGDEBUG("TcrMessage::writeHeader m_isMetaRegion = %d", m_isMetaRegion); |
| if (m_tcdm != nullptr) { |
| if ((isSecurityOn = |
| (m_tcdm->isSecurityOn() && |
| TcrMessage::isUserInitiativeOps(*this) && !m_isMetaRegion))) { |
| earlyAck |= 0x2; |
| } |
| } |
| |
| LOGDEBUG("TcrMessage::writeHeader earlyAck = %d", earlyAck); |
| |
| m_request->writeInt(static_cast<int32_t>(msgType)); |
| m_request->writeInt( |
| static_cast<int32_t>(0)); // write a dummy message len('0' here). At |
| // the end write the length at the (buffer + |
| // 4) offset. |
| m_request->writeInt(static_cast<int32_t>(numOfParts)); |
| auto txState = TSSTXStateWrapper::get().getTXState(); |
| if (txState == nullptr) { |
| m_txId = -1; |
| } else { |
| m_txId = txState->getTransactionId().getId(); |
| } |
| m_request->writeInt(m_txId); |
| |
| // updateHeaderForRetry assumes that 16 bytes are written before earlyAck |
| // byte. In case, |
| // write header definition changes, updateHeaderForRetry should change |
| // accordingly. |
| m_request->write(earlyAck); |
| } |
| |
| // Updates the early ack byte of the message to reflect that it is a retry op |
| // This function assumes that 16 bytes are written before earlyAck byte. In |
| // case, |
| // write header definition changes, this function should change accordingly. |
| void TcrMessage::updateHeaderForRetry() { |
| uint8_t earlyAck = m_request->getValueAtPos(16); |
| // set the isRetryBit |
| m_request->updateValueAtPos(16, earlyAck | 0x4); |
| } |
| |
| void TcrMessage::writeRegionPart(const std::string& regionName) { |
| int32_t len = static_cast<int32_t>(regionName.length()); |
| m_request->writeInt(len); |
| m_request->write(static_cast<int8_t>(0)); // isObject = 0 |
| m_request->writeBytesOnly( |
| reinterpret_cast<int8_t*>(const_cast<char*>(regionName.c_str())), len); |
| } |
| |
| void TcrMessage::writeStringPart(const std::string& str) { |
| if (!str.empty()) { |
| auto jmutf8 = internal::JavaModifiedUtf8::fromString(str); |
| |
| auto encodedLen = static_cast<int32_t>(jmutf8.length()); |
| m_request->writeInt(encodedLen); |
| m_request->ensureCapacity(encodedLen); |
| m_request->write(static_cast<int8_t>(0)); // isObject = 0 BYTE_CODE |
| m_request->writeBytesOnly(reinterpret_cast<const int8_t*>(jmutf8.data()), |
| encodedLen); |
| } else { |
| m_request->writeInt(static_cast<uint16_t>(0)); |
| } |
| } |
| |
| void TcrMessage::writeEventIdPart(int reserveSize, |
| bool fullValueAfterDeltaFail) { |
| EventId eid(true, reserveSize, fullValueAfterDeltaFail); // set true so we |
| // auto-gen next |
| // per-thread |
| // sequence number |
| // Write EventId threadid and seqno. |
| eid.writeIdsData(*m_request); |
| } |
| |
| void TcrMessage::writeMessageLength() { |
| auto totalLen = m_request->getBufferLength(); |
| auto msgLen = totalLen - g_headerLen; |
| m_request->rewindCursor( |
| totalLen - |
| 4); // msg len is written after the msg type which is of 4 bytes ... |
| m_request->writeInt(static_cast<int32_t>(msgLen)); |
| m_request->advanceCursor(totalLen - 8); // after writing 4 bytes for msg len |
| // you are already 8 bytes ahead from |
| // the beginning. |
| } |
| |
| void TcrMessage::startProcessChunk(ACE_Semaphore& finalizeSema) { |
| if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::CLOSECLIENTCQS_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::GETCQSTATS_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::MONITORCQ_MSG_TYPE) { |
| return; |
| } |
| if (m_chunkedResult == nullptr) { |
| throw FatalInternalException( |
| "TcrMessage::startProcessChunk: null " |
| "result processor!"); |
| } |
| switch (m_msgTypeRequest) { |
| case TcrMessage::REGISTER_INTEREST: |
| case TcrMessage::REGISTER_INTEREST_LIST: |
| case TcrMessage::QUERY: |
| case TcrMessage::QUERY_WITH_PARAMETERS: |
| case TcrMessage::EXECUTE_FUNCTION: |
| case TcrMessage::EXECUTE_REGION_FUNCTION: |
| case TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP: |
| case TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE: |
| case TcrMessage::GETDURABLECQS_MSG_TYPE: |
| case TcrMessage::KEY_SET: |
| case TcrMessage::GET_ALL_70: |
| case TcrMessage::GET_ALL_WITH_CALLBACK: |
| case TcrMessage::PUTALL: |
| case TcrMessage::PUT_ALL_WITH_CALLBACK: |
| case TcrMessage::REMOVE_ALL: { |
| m_chunkedResult->reset(); |
| break; |
| } |
| default: { |
| LOGERROR( |
| "Got unexpected request message type %d while starting to process " |
| "response", |
| m_msgTypeRequest); |
| throw IllegalStateException( |
| "Got unexpected request msg type while starting to process response"); |
| } |
| } |
| m_chunkedResult->setFinalizeSemaphore(&finalizeSema); |
| } |
| |
| bool TcrMessage::isFEAnotherHop() { return m_feAnotherHop; } |
| void TcrMessage::handleSpecialFECase() { |
| LOGDEBUG("handleSpecialFECase1 %d", this->m_isLastChunkAndisSecurityHeader); |
| if ((this->m_isLastChunkAndisSecurityHeader & 0x01) == 0x01) { |
| LOGDEBUG("handleSpecialFECase2 %d", this->m_isLastChunkAndisSecurityHeader); |
| if (!((this->m_isLastChunkAndisSecurityHeader & 0x04) == 0x04)) { |
| LOGDEBUG("handleSpecialFECase3 %d", |
| this->m_isLastChunkAndisSecurityHeader); |
| m_feAnotherHop = true; |
| } |
| } |
| } |
| |
| void TcrMessage::processChunk(const std::vector<uint8_t>& chunk, int32_t len, |
| uint16_t endpointmemId, |
| const uint8_t isLastChunkAndisSecurityHeader) { |
| // TODO: see if security header is there |
| LOGDEBUG( |
| "TcrMessage::processChunk isLastChunkAndisSecurityHeader = %d chunklen = " |
| "%d m_msgType = %d", |
| isLastChunkAndisSecurityHeader, len, m_msgType); |
| |
| this->m_isLastChunkAndisSecurityHeader = isLastChunkAndisSecurityHeader; |
| handleSpecialFECase(); |
| |
| if (m_tcdm == nullptr) { |
| throw FatalInternalException("TcrMessage::processChunk: null DM!"); |
| } |
| |
| switch (m_msgType) { |
| case TcrMessage::REPLY: { |
| LOGDEBUG("processChunk - got reply for request %d", m_msgTypeRequest); |
| chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader); |
| break; |
| } |
| case TcrMessage::RESPONSE: { |
| if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::CLOSECLIENTCQS_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::GETCQSTATS_MSG_TYPE || |
| m_msgTypeRequest == TcrMessage::MONITORCQ_MSG_TYPE) { |
| LOGDEBUG("processChunk - got CQ response for request %d", |
| m_msgTypeRequest); |
| // TODO: do we need to do anything here |
| break; |
| } else if (m_msgTypeRequest == TcrMessage::PUTALL || |
| m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) { |
| TcrChunkedContext* chunkedContext = new TcrChunkedContext( |
| chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader, |
| m_tcdm->getConnectionManager().getCacheImpl()); |
| m_chunkedResult->setEndpointMemId(endpointmemId); |
| m_tcdm->queueChunk(chunkedContext); |
| if (chunk.empty()) { |
| // last chunk -- wait for processing of all the chunks to complete |
| m_chunkedResult->waitFinalize(); |
| auto ex = m_chunkedResult->getException(); |
| if (ex != nullptr) { |
| throw *ex; |
| } |
| } |
| break; |
| } |
| // fall-through for other cases |
| } |
| case TcrMessage::EXECUTE_REGION_FUNCTION_RESULT: |
| case TcrMessage::EXECUTE_FUNCTION_RESULT: |
| case TcrMessage::CQDATAERROR_MSG_TYPE: // one part |
| case TcrMessage::CQ_EXCEPTION_TYPE: // one part |
| case TcrMessage::RESPONSE_FROM_PRIMARY: { |
| if (m_chunkedResult != nullptr) { |
| LOGDEBUG("tcrmessage in case22 "); |
| TcrChunkedContext* chunkedContext = new TcrChunkedContext( |
| chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader, |
| m_tcdm->getConnectionManager().getCacheImpl()); |
| m_chunkedResult->setEndpointMemId(endpointmemId); |
| m_tcdm->queueChunk(chunkedContext); |
| if (chunk.empty()) { |
| // last chunk -- wait for processing of all the chunks to complete |
| m_chunkedResult->waitFinalize(); |
| // Throw any exception during processing here. |
| // Do not throw it immediately since we want to read the |
| // full data from socket in any case. |
| // Notice that TcrChunkedContext::handleChunk stops any |
| // further processing as soon as an exception is encountered. |
| // This can cause behaviour like partially filled cache in case |
| // of populating cache with registerAllKeys(), so that should be |
| // documented since rolling that back may not be a good idea either. |
| if (const auto& ex = m_chunkedResult->getException()) { |
| throw Exception(*ex); |
| } |
| } |
| } else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType || |
| TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType || |
| TcrMessage::GET_ALL_DATA_ERROR == m_msgType) { |
| if (!chunk.empty()) { |
| chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader); |
| } |
| } |
| break; |
| } |
| case TcrMessage::REGISTER_INTEREST_DATA_ERROR: // for register interest |
| // error |
| case EXECUTE_FUNCTION_ERROR: |
| case EXECUTE_REGION_FUNCTION_ERROR: { |
| if (!chunk.empty()) { |
| // DeleteArray<const uint8_t> delChunk(bytes); |
| // DataInput input(bytes, len); |
| // TODO: this not send two part... |
| // looks like this is our exception so only one part will come |
| // readExceptionPart(input, false); |
| // readSecureObjectPart(input, false, true, |
| // isLastChunkAndisSecurityHeader ); |
| chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader); |
| } |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| if (!chunk.empty()) { |
| auto input = |
| m_tcdm->getConnectionManager().getCacheImpl()->createDataInput( |
| chunk.data(), len); |
| readExceptionPart(input, isLastChunkAndisSecurityHeader); |
| readSecureObjectPart(input, false, true, |
| isLastChunkAndisSecurityHeader); |
| } |
| break; |
| } |
| case TcrMessage::RESPONSE_FROM_SECONDARY: { |
| // TODO: how many parts |
| chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader); |
| if (chunk.size()) { |
| LOGFINEST("processChunk - got response from secondary, ignoring."); |
| } |
| break; |
| } |
| case TcrMessage::PUT_DATA_ERROR: { |
| chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader); |
| if (!chunk.empty()) { |
| auto input = |
| m_tcdm->getConnectionManager().getCacheImpl()->createDataInput( |
| chunk.data(), len); |
| auto errorString = readStringPart(input); |
| |
| if (!errorString.empty()) { |
| errorString.erase( |
| errorString.begin(), |
| std::find_if(errorString.begin(), errorString.end(), |
| std::not1(std::ptr_fun<int, int>(std::isspace)))); |
| LOGDEBUG( |
| "TcrMessage::%s: setting thread-local ex msg to \"%s\", %s, %d", |
| __FUNCTION__, errorString.c_str(), __FILE__, __LINE__); |
| setThreadLocalExceptionMessage(errorString.c_str()); |
| } |
| } |
| break; |
| } |
| case TcrMessage::GET_ALL_DATA_ERROR: { |
| chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader); |
| |
| break; |
| } |
| default: { |
| // TODO: how many parts what should we do here |
| if (chunk.empty()) { |
| LOGWARN( |
| "Got unhandled message type %d while processing response, possible " |
| "serialization mismatch", |
| m_msgType); |
| throw MessageException( |
| "TcrMessage::processChunk: " |
| "got unhandled message type"); |
| } |
| break; |
| } |
| } |
| } |
| |
| Pool* TcrMessage::getPool() const { |
| if (m_region) { |
| return m_region->getPool().get(); |
| } |
| return nullptr; |
| } |
| |
| void TcrMessage::chunkSecurityHeader(int skipPart, |
| const std::vector<uint8_t> bytes, |
| int32_t len, |
| uint8_t isLastChunkAndSecurityHeader) { |
| LOGDEBUG("TcrMessage::chunkSecurityHeader:: skipParts = %d", skipPart); |
| if ((isLastChunkAndSecurityHeader & 0x3) == 0x3) { |
| auto di = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput( |
| bytes.data(), len); |
| skipParts(di, skipPart); |
| readSecureObjectPart(di, false, true, isLastChunkAndSecurityHeader); |
| } |
| } |
| |
| void TcrMessage::handleByteArrayResponse( |
| const char* bytearray, int32_t len, uint16_t endpointMemId, |
| const SerializationRegistry& serializationRegistry, |
| MemberListForVersionStamp& memberListForVersionStamp) { |
| auto input = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput( |
| reinterpret_cast<uint8_t*>(const_cast<char*>(bytearray)), len, getPool()); |
| // TODO:: this need to make sure that pool is there |
| // if(m_tcdm == nullptr) |
| // throw IllegalArgumentException("Pool is nullptr in TcrMessage"); |
| m_msgType = input.readInt32(); |
| int32_t msglen; |
| msglen = input.readInt32(); |
| int32_t numparts; |
| numparts = input.readInt32(); |
| m_txId = input.readInt32(); |
| auto earlyack = input.read(); |
| LOGDEBUG( |
| "handleByteArrayResponse m_msgType = %d isSecurityOn = %d requesttype " |
| "=%d", |
| m_msgType, isSecurityOn, m_msgTypeRequest); |
| LOGDEBUG( |
| "Message type=%d, length=%d, parts=%d, txid=%d and eack %d with data " |
| "length=%d", |
| m_msgType, msglen, numparts, m_txId, earlyack, len); |
| |
| // LOGFINE("Message type=%d, length=%d, parts=%d, txid=%d and eack %d with |
| // data length=%d", |
| // m_msgType, msglen, numparts, m_txId, earlyack, len); |
| |
| switch (m_msgType) { |
| case TcrMessage::RESPONSE: { |
| if (m_msgTypeRequest == TcrMessage::CONTAINS_KEY) { |
| readBooleanPartAsObject(input, &m_boolValue); |
| } else if (m_msgTypeRequest == TcrMessage::USER_CREDENTIAL_MESSAGE) { |
| readUniqueIDObjectPart(input); |
| } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_TYPE || |
| m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_ENUM) { |
| // int will come in response |
| uint32_t typeId; |
| readIntPart(input, &typeId); |
| m_value = CacheableInt32::create(typeId); |
| } else if (m_msgTypeRequest == TcrMessage::GET_PDX_TYPE_BY_ID) { |
| // PdxType will come in response |
| input.advanceCursor(5); // part header |
| m_value = serializationRegistry.deserialize( |
| input, static_cast<int8_t>(DSCode::PdxType)); |
| } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ENUM_BY_ID) { |
| // PdxType will come in response |
| input.advanceCursor(5); // part header |
| m_value = serializationRegistry.deserialize(input); |
| } else if (m_msgTypeRequest == TcrMessage::GET_FUNCTION_ATTRIBUTES) { |
| // read and ignore length |
| input.readInt32(); |
| input.advanceCursor(1); // ignore byte |
| |
| m_functionAttributes = new std::vector<int8_t>(); |
| m_functionAttributes->push_back(input.read()); |
| m_functionAttributes->push_back(input.read()); |
| m_functionAttributes->push_back(input.read()); |
| } else if (m_msgTypeRequest == TcrMessage::REQUEST) { |
| int32_t receivednumparts = 2; |
| readObjectPart(input); |
| uint32_t flag = 0; |
| readIntPart(input, &flag); |
| if (flag & 0x01) { |
| readCallbackObjectPart(input); |
| receivednumparts++; |
| } |
| |
| if ((m_value == nullptr) && (flag & 0x08 /*VALUE_IS_INVALID*/)) { |
| m_value = CacheableToken::invalid(); |
| } |
| |
| if (flag & 0x02) { |
| readVersionTag(input, endpointMemId, memberListForVersionStamp); |
| receivednumparts++; |
| } |
| |
| if (flag & 0x04 /*KEY_NOT_PRESENT*/) { |
| m_value = CacheableToken::tombstone(); |
| } |
| |
| if (numparts > receivednumparts) readPrMetaData(input); |
| |
| } else if (m_decodeAll) { |
| readObjectPart(input); |
| if (numparts == 2) { |
| if (m_isCallBackArguement) { |
| readCallbackObjectPart(input); |
| } else { |
| int32_t lenObj = input.readInt32(); |
| input.readBoolean(); |
| m_metaDataVersion = input.read(); |
| if (lenObj == 2) { |
| m_serverGroupVersion = input.read(); |
| LOGDEBUG( |
| "Single-hop m_serverGroupVersion in message response is %d", |
| m_serverGroupVersion); |
| } |
| } |
| } else if (numparts > 2) { |
| skipParts(input, 1); |
| int32_t lenObj = input.readInt32(); |
| input.readBoolean(); |
| m_metaDataVersion = input.read(); |
| LOGFINE("Single-hop metadata version in message response is %d", |
| m_metaDataVersion); |
| if (lenObj == 2) { |
| m_serverGroupVersion = input.read(); |
| LOGDEBUG( |
| "Single-hop m_serverGroupVersion in message response is %d", |
| m_serverGroupVersion); |
| } |
| } |
| } |
| break; |
| } |
| |
| case TcrMessage::EXCEPTION: { |
| uint8_t lastChunk = static_cast<uint8_t>(numparts); |
| lastChunk = (lastChunk << 5); |
| readExceptionPart(input, lastChunk); |
| // if (isSecurityOn) |
| // readSecureObjectPart( input ); |
| break; |
| } |
| |
| case TcrMessage::INVALID: { |
| // Read the string in the reply |
| LOGWARN("Received invalid message type as reply from server"); |
| readObjectPart(input, true); |
| break; |
| } |
| |
| case TcrMessage::CLIENT_REGISTER_INTEREST: |
| case TcrMessage::CLIENT_UNREGISTER_INTEREST: |
| case TcrMessage::SERVER_TO_CLIENT_PING: |
| case TcrMessage::REGISTER_INSTANTIATORS: { |
| // ignore this |
| m_shouldIgnore = true; |
| break; |
| } |
| |
| case TcrMessage::REGISTER_INTEREST_DATA_ERROR: |
| case TcrMessage::UNREGISTER_INTEREST_DATA_ERROR: |
| case TcrMessage::PUT_DATA_ERROR: |
| case TcrMessage::KEY_SET_DATA_ERROR: |
| case TcrMessage::DESTROY_REGION_DATA_ERROR: |
| case TcrMessage::CLEAR_REGION_DATA_ERROR: |
| case TcrMessage::CONTAINS_KEY_DATA_ERROR: |
| case TcrMessage::PUT_DELTA_ERROR: |
| case TcrMessage::REQUEST_DATA_ERROR: { |
| m_value = std::make_shared<CacheableString>(readStringPart(input)); |
| break; |
| } |
| |
| case TcrMessage::REPLY: { |
| switch (m_msgTypeRequest) { |
| case TcrMessage::PUT: { |
| readPrMetaData(input); |
| uint32_t flags = 0; |
| readIntPart(input, &flags); |
| if (flags & 0x01) { // has old value |
| readOldValue(input); |
| } |
| if (flags & 0x04) { |
| readVersionTag(input, endpointMemId, memberListForVersionStamp); |
| } |
| break; |
| } |
| case TcrMessage::INVALIDATE: { |
| uint32_t flags = 0; |
| readIntPart(input, &flags); |
| if (flags & 0x01) { |
| readVersionTag(input, endpointMemId, memberListForVersionStamp); |
| } |
| readPrMetaData(input); |
| |
| break; |
| } |
| case TcrMessage::DESTROY: { |
| uint32_t flags = 0; |
| readIntPart(input, &flags); |
| if (flags & 0x01) { |
| readVersionTag(input, endpointMemId, memberListForVersionStamp); |
| } |
| readPrMetaData(input); |
| // skip the Destroy65.java response entryNotFound int part so |
| // that the readSecureObjectPart() call below gets the security part |
| // skipParts(input, 1); |
| readIntPart(input, &m_entryNotFound); |
| LOGDEBUG("Inside TcrMessage::REPLY::DESTROY m_entryNotFound = %d ", |
| m_entryNotFound); |
| break; |
| } |
| case TcrMessage::PING: |
| default: { |
| readPrMetaData(input); |
| break; |
| } |
| } |
| break; |
| } |
| case TcrMessage::LOCAL_INVALIDATE: |
| case TcrMessage::LOCAL_DESTROY: { |
| int32_t regionLen = input.readInt32(); |
| input.advanceCursor(1); // ignore byte |
| char* regname = nullptr; |
| regname = new char[regionLen + 1]; |
| DeleteArray<char> delRegName(regname); |
| input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); |
| regname[regionLen] = '\0'; |
| m_regionName = regname; |
| |
| readKeyPart(input); |
| |
| // skipParts(input, 1); // skip callbackarg parts |
| readCallbackObjectPart(input); |
| readVersionTag(input, endpointMemId, memberListForVersionStamp); |
| readBooleanPartAsObject(input, &m_isInterestListPassed); |
| readBooleanPartAsObject(input, &m_hasCqsPart); |
| if (m_hasCqsPart) { |
| if (m_msgType == TcrMessage::LOCAL_INVALIDATE) { |
| readIntPart(input, &m_msgTypeForCq); |
| } else { |
| m_msgTypeForCq = static_cast<uint32_t>(m_msgType); |
| } |
| // LOGINFO("got cq local local_invalidate/local_destroy read |
| // m_hasCqsPart"); |
| readCqsPart(input); |
| } |
| |
| // read eventid part |
| readEventIdPart(input, false); |
| |
| break; |
| } |
| |
| case TcrMessage::LOCAL_CREATE: |
| case TcrMessage::LOCAL_UPDATE: { |
| int32_t regionLen = input.readInt32(); |
| input.advanceCursor(1); // ignore byte |
| char* regname = nullptr; |
| regname = new char[regionLen + 1]; |
| DeleteArray<char> delRegName(regname); |
| input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); |
| regname[regionLen] = '\0'; |
| m_regionName = regname; |
| |
| readKeyPart(input); |
| // Read delta flag |
| bool isDelta = false; |
| readBooleanPartAsObject(input, &isDelta); |
| if (isDelta) { |
| m_deltaBytesLen = input.readInt32(); |
| |
| input.advanceCursor(1); // ignore byte |
| m_deltaBytes = new int8_t[m_deltaBytesLen]; |
| input.readBytesOnly(m_deltaBytes, m_deltaBytesLen); |
| m_delta = std::unique_ptr<DataInput>(new DataInput( |
| m_tcdm->getConnectionManager().getCacheImpl()->createDataInput( |
| reinterpret_cast<const uint8_t*>(m_deltaBytes), |
| m_deltaBytesLen))); |
| } else { |
| readObjectPart(input); |
| } |
| |
| // skip callbackarg part |
| // skipParts(input, 1); |
| readCallbackObjectPart(input); |
| readVersionTag(input, endpointMemId, memberListForVersionStamp); |
| readBooleanPartAsObject(input, &m_isInterestListPassed); |
| readBooleanPartAsObject(input, &m_hasCqsPart); |
| |
| if (m_hasCqsPart) { |
| // LOGINFO("got cq local_create/local_create"); |
| readCqsPart(input); |
| m_msgTypeForCq = static_cast<uint32_t>(m_msgType); |
| } |
| |
| // read eventid part |
| readEventIdPart(input, false); |
| _GEODE_SAFE_DELETE_ARRAY(regname); // COVERITY ---> 30299 Resource leak |
| |
| break; |
| } |
| case TcrMessage::CLIENT_MARKER: { |
| // dont skip (non-existent) callbackarg part, just read eventid part |
| readEventIdPart(input, false); |
| break; |
| } |
| |
| case TcrMessage::LOCAL_DESTROY_REGION: |
| case TcrMessage::CLEAR_REGION: { |
| int32_t regionLen = input.readInt32(); |
| input.advanceCursor(1); // ignore byte |
| char* regname = nullptr; |
| regname = new char[regionLen + 1]; |
| DeleteArray<char> delRegName(regname); |
| input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); |
| regname[regionLen] = '\0'; |
| m_regionName = regname; |
| // skip callbackarg part |
| // skipParts(input, 1); |
| readCallbackObjectPart(input); |
| readBooleanPartAsObject(input, &m_hasCqsPart); |
| if (m_hasCqsPart) { |
| // LOGINFO("got cq region_destroy read m_hasCqsPart"); |
| readCqsPart(input); |
| } |
| // read eventid part |
| readEventIdPart(input, false); |
| break; |
| } |
| |
| case TcrMessage::RESPONSE_CLIENT_PR_METADATA: { |
| if (len == 17) { |
| LOGDEBUG("RESPONSE_CLIENT_PR_METADATA len is 17"); |
| return; |
| } |
| m_metadata = |
| new std::vector<std::vector<std::shared_ptr<BucketServerLocation>>>(); |
| for (int32_t i = 0; i < numparts; i++) { |
| input.readInt32(); // ignore partlen |
| input.read(); // ignore isObj; |
| auto bits8 = input.read(); // cacheable vector typeid |
| LOGDEBUG("Expected typeID %d, got %d", DSCode::CacheableArrayList, |
| bits8); |
| |
| auto arrayLength = input.readArrayLength(); // array length |
| LOGDEBUG("Array length = %d ", arrayLength); |
| if (arrayLength > 0) { |
| std::vector<std::shared_ptr<BucketServerLocation>> |
| bucketServerLocations; |
| for (int32_t index = 0; index < arrayLength; index++) { |
| // ignore DS typeid, CLASS typeid, and string typeid |
| input.advanceCursor(3); |
| uint16_t classLen = input.readInt16(); // Read classLen |
| input.advanceCursor(classLen); |
| auto location = std::make_shared<BucketServerLocation>(); |
| location->fromData(input); |
| LOGFINE("location contains %d\t%s\t%d\t%d\t%s", |
| location->getBucketId(), location->getServerName().c_str(), |
| location->getPort(), location->getVersion(), |
| (location->isPrimary() ? "true" : "false")); |
| bucketServerLocations.push_back(location); |
| } |
| m_metadata->push_back(bucketServerLocations); |
| } |
| LOGFINER("Metadata size is %", m_metadata->size()); |
| } |
| break; |
| } |
| |
| case TcrMessage::GET_CLIENT_PR_METADATA_ERROR: { |
| LOGERROR("Failed to get single-hop meta data"); |
| break; |
| } |
| |
| case TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES: { |
| input.readInt32(); // ignore partlen; |
| input.read(); // ignore isObj; |
| |
| // PART1 = bucketCount |
| m_bucketCount = input.readNativeInt32(); |
| |
| auto partLength = input.readInt32(); // partlen; |
| input.read(); // ignore isObj; |
| if (partLength > 0) { |
| // PART2 = colocatedwith |
| m_colocatedWith = input.readString(); |
| } |
| |
| if (numparts == 4) { |
| partLength = input.readInt32(); // partlen; |
| input.read(); // ignore isObj; |
| if (partLength > 0) { |
| // PART3 = partitionresolvername |
| m_partitionResolverName = input.readString(); |
| } |
| |
| input.readInt32(); // ignore partlen; |
| input.read(); // ignore isObj; |
| input.read(); // ignore cacheable CacheableHashSet typeid |
| |
| auto arrayLength = input.readArrayLength(); // array length |
| if (arrayLength > 0) { |
| m_fpaSet = |
| new std::vector<std::shared_ptr<FixedPartitionAttributesImpl>>(); |
| for (int32_t index = 0; index < arrayLength; index++) { |
| input.advanceCursor( |
| 3); // ignore DS typeid, CLASS typeid, string typeid |
| auto classLen = input.readInt16(); // Read classLen |
| input.advanceCursor(classLen); |
| auto fpa = std::make_shared<FixedPartitionAttributesImpl>(); |
| fpa->fromData(input); // PART4 = set of FixedAttributes. |
| LOGDEBUG("fpa contains %d\t%s\t%d\t%d", fpa->getNumBuckets(), |
| fpa->getPartitionName().c_str(), fpa->isPrimary(), |
| fpa->getStartingBucketID()); |
| m_fpaSet->push_back(fpa); |
| } |
| } |
| } |
| break; |
| } |
| case TcrMessage::TOMBSTONE_OPERATION: { |
| uint32_t tombstoneOpType; |
| int32_t regionLen = input.readInt32(); |
| input.read(); |
| char* regname = nullptr; |
| |
| regname = new char[regionLen + 1]; |
| DeleteArray<char> delRegName(regname); |
| input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); |
| regname[regionLen] = '\0'; |
| m_regionName = regname; |
| readIntPart(input, &tombstoneOpType); // partlen; |
| // read and ignore length |
| input.readInt32(); |
| // read and ignore isObj |
| input.read(); |
| |
| if (tombstoneOpType == 0) { |
| if (m_tombstoneVersions == nullptr) { |
| m_tombstoneVersions = CacheableHashMap::create(); |
| } |
| readHashMapForGCVersions(input, m_tombstoneVersions); |
| } else if (tombstoneOpType == 1) { |
| if (m_tombstoneKeys == nullptr) { |
| m_tombstoneKeys = CacheableHashSet::create(); |
| } |
| // input.readObject(m_tombstoneKeys); |
| readHashSetForGCVersions(input, m_tombstoneKeys); |
| } else { |
| LOGERROR("Failed to read the tombstone versions"); |
| break; |
| } |
| // readEventId Part |
| readEventIdPart(input, false); |
| break; |
| } |
| case TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES_ERROR: { |
| LOGERROR("Failed to get server partitioned region attributes"); |
| break; |
| } |
| |
| case TcrMessage::UNKNOWN_MESSAGE_TYPE_ERROR: { |
| // do nothing |
| break; |
| } |
| |
| case TcrMessage::REQUEST_EVENT_VALUE_ERROR: { |
| LOGERROR("Error while requesting full value for delta"); |
| break; |
| } |
| |
| default: |
| LOGERROR( |
| "Unknown message type %d in response, possible serialization " |
| "mismatch", |
| m_msgType); |
| std::stringstream ss; |
| ss << boost::stacktrace::stacktrace(); |
| LOGERROR(ss.str().c_str()); |
| throw MessageException("handleByteArrayResponse: unknown message type"); |
| } |
| LOGDEBUG("handleByteArrayResponse earlyack = %d ", earlyack); |
| if (earlyack & 0x2) readSecureObjectPart(input); |
| } |
| |
| TcrMessageDestroyRegion::TcrMessageDestroyRegion( |
| DataOutput* dataOutput, const Region* region, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::chrono::milliseconds messageResponsetimeout, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::DESTROY_REGION; |
| m_tcdm = connectionDM; |
| m_regionName = |
| region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_messageResponseTimeout = messageResponsetimeout; |
| |
| uint32_t numOfParts = 1; |
| if (aCallbackArgument != nullptr) { |
| ++numOfParts; |
| } |
| |
| numOfParts++; |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| numOfParts++; |
| } |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeEventIdPart(); |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| writeMillisecondsPart(m_messageResponseTimeout); |
| } |
| |
| writeMessageLength(); |
| } |
| |
| TcrMessageClearRegion::TcrMessageClearRegion( |
| DataOutput* dataOutput, const Region* region, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::chrono::milliseconds messageResponsetimeout, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::CLEAR_REGION; |
| m_tcdm = connectionDM; |
| m_regionName = |
| region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_messageResponseTimeout = messageResponsetimeout; |
| |
| isSecurityOn = false; |
| m_isSecurityHeaderAdded = false; |
| |
| uint32_t numOfParts = 1; |
| if (aCallbackArgument != nullptr) { |
| ++numOfParts; |
| } |
| |
| numOfParts++; |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| numOfParts++; |
| } |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeEventIdPart(); |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| writeMillisecondsPart(m_messageResponseTimeout); |
| } |
| |
| writeMessageLength(); |
| } |
| |
| TcrMessageQuery::TcrMessageQuery( |
| DataOutput* dataOutput, const std::string& regionName, |
| std::chrono::milliseconds messageResponsetimeout, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::QUERY; |
| m_tcdm = connectionDM; |
| m_regionName = regionName; // this is querystri; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_messageResponseTimeout = messageResponsetimeout; |
| m_region = nullptr; |
| uint32_t numOfParts = 1; |
| |
| numOfParts++; |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| numOfParts++; |
| } |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeEventIdPart(); |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| writeMillisecondsPart(m_messageResponseTimeout); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageStopCQ::TcrMessageStopCQ( |
| DataOutput* dataOutput, const std::string& regionName, |
| std::chrono::milliseconds messageResponsetimeout, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::STOPCQ_MSG_TYPE; |
| m_tcdm = connectionDM; |
| m_regionName = regionName; // this is querystring |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_messageResponseTimeout = messageResponsetimeout; |
| m_region = nullptr; |
| m_isSecurityHeaderAdded = false; |
| m_isMetaRegion = false; |
| |
| uint32_t numOfParts = 1; |
| |
| numOfParts++; |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| numOfParts++; |
| } |
| |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeEventIdPart(); |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| writeMillisecondsPart(m_messageResponseTimeout); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageCloseCQ::TcrMessageCloseCQ( |
| DataOutput* dataOutput, const std::string& regionName, |
| std::chrono::milliseconds messageResponsetimeout, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::CLOSECQ_MSG_TYPE; |
| m_tcdm = connectionDM; |
| m_regionName = regionName; // this is querystring |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_messageResponseTimeout = messageResponsetimeout; |
| m_region = nullptr; |
| uint32_t numOfParts = 1; |
| |
| numOfParts++; |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| numOfParts++; |
| } |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeEventIdPart(); |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| writeMillisecondsPart(m_messageResponseTimeout); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageQueryWithParameters::TcrMessageQueryWithParameters( |
| DataOutput* dataOutput, const std::string& regionName, |
| const std::shared_ptr<Serializable>&, |
| std::shared_ptr<CacheableVector> paramList, |
| std::chrono::milliseconds messageResponsetimeout, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::QUERY_WITH_PARAMETERS; |
| m_tcdm = connectionDM; |
| m_regionName = regionName; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_messageResponseTimeout = messageResponsetimeout; |
| m_region = nullptr; |
| |
| // Find out the numOfParts |
| uint32_t numOfParts = 4 + static_cast<uint32_t>(paramList->size()); |
| writeHeader(m_msgType, numOfParts); |
| // Part-1: Query String |
| writeRegionPart(m_regionName); |
| |
| // Part-2: Number or length of the parameters |
| writeIntPart(static_cast<uint32_t>(paramList->size())); |
| |
| // Part-3: X (COMPILE_QUERY_CLEAR_TIMEOUT) parameter |
| writeIntPart(15); |
| |
| // Part-4: Request specific timeout |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| writeMillisecondsPart(m_messageResponseTimeout); |
| } |
| // Part-5: Parameters |
| if (paramList != nullptr) { |
| for (const auto& value : *paramList) { |
| writeObjectPart(value); |
| } |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageContainsKey::TcrMessageContainsKey( |
| DataOutput* dataOutput, const Region* region, |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, bool isContainsKey, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::CONTAINS_KEY; |
| m_tcdm = connectionDM; |
| m_regionName = |
| region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| |
| uint32_t numOfParts = 2; |
| if (aCallbackArgument != nullptr) { |
| ++numOfParts; |
| } |
| |
| numOfParts++; |
| |
| if (key == nullptr) { |
| throw IllegalArgumentException( |
| "key passed to the constructor can't be nullptr"); |
| } |
| |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeObjectPart(key); |
| // write 0 to indicate containskey (1 for containsvalueforkey) |
| writeIntPart(isContainsKey ? 0 : 1); |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageGetDurableCqs::TcrMessageGetDurableCqs( |
| DataOutput* dataOutput, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::GETDURABLECQS_MSG_TYPE; |
| m_tcdm = connectionDM; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_region = nullptr; |
| // wrirting msgtype with part length =1 |
| writeHeader(m_msgType, 1); |
| // the server expects at least 1 part, so writing a dummy byte part |
| writeBytePart(0); |
| writeMessageLength(); |
| } |
| |
| TcrMessageRequest::TcrMessageRequest( |
| DataOutput* dataOutput, const Region* region, |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::REQUEST; |
| m_tcdm = connectionDM; |
| m_key = key; |
| m_regionName = |
| (region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath()); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| |
| uint32_t numOfParts = 2; |
| if (aCallbackArgument != nullptr) { |
| ++numOfParts; |
| } |
| |
| numOfParts++; |
| |
| if (key == nullptr) { |
| throw IllegalArgumentException( |
| "key passed to the constructor can't be nullptr"); |
| } |
| |
| numOfParts--; // no event id for request |
| writeHeader(TcrMessage::REQUEST, numOfParts); |
| writeRegionPart(m_regionName); |
| writeObjectPart(key); |
| if (aCallbackArgument != nullptr) { |
| // set bool variable to true. |
| m_isCallBackArguement = true; |
| writeObjectPart(aCallbackArgument); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageInvalidate::TcrMessageInvalidate( |
| DataOutput* dataOutput, const Region* region, |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::INVALIDATE; |
| m_tcdm = connectionDM; |
| m_key = key; |
| m_regionName = |
| (region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath()); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| |
| uint32_t numOfParts = 2; |
| if (aCallbackArgument != nullptr) { |
| ++numOfParts; |
| } |
| |
| numOfParts++; |
| |
| if (key == nullptr) { |
| throw IllegalArgumentException( |
| "key passed to the constructor can't be nullptr"); |
| } |
| |
| writeHeader(TcrMessage::INVALIDATE, numOfParts); |
| writeRegionPart(m_regionName); |
| writeObjectPart(key); |
| writeEventIdPart(); |
| if (aCallbackArgument != nullptr) { |
| // set bool variable to true. |
| m_isCallBackArguement = true; |
| writeObjectPart(aCallbackArgument); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageDestroy::TcrMessageDestroy( |
| DataOutput* dataOutput, const Region* region, |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::DESTROY; |
| m_tcdm = connectionDM; |
| m_key = key; |
| m_regionName = |
| (region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath()); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| uint32_t numOfParts = 2; |
| if (aCallbackArgument != nullptr) { |
| ++numOfParts; |
| } |
| |
| numOfParts++; |
| |
| if (key == nullptr) { |
| throw IllegalArgumentException( |
| "key passed to the constructor can't be nullptr"); |
| } |
| |
| if (value != nullptr) { |
| numOfParts += 2; // for GFE Destroy65.java |
| writeHeader(TcrMessage::DESTROY, numOfParts); |
| writeRegionPart(m_regionName); |
| writeObjectPart(key); |
| writeObjectPart(value); // expectedOldValue part |
| uint8_t removeByte = 8; // OP_TYPE_DESTROY value from Operation.java |
| auto removeBytePart = CacheableByte::create(removeByte); |
| writeObjectPart(removeBytePart); // operation part |
| writeEventIdPart(); |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } |
| writeMessageLength(); |
| } else { |
| numOfParts += 2; // for GFE Destroy65.java |
| writeHeader(TcrMessage::DESTROY, numOfParts); |
| writeRegionPart(m_regionName); |
| writeObjectPart(key); |
| writeObjectPart(nullptr); // expectedOldValue part |
| writeObjectPart(nullptr); // operation part |
| writeEventIdPart(); |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } |
| writeMessageLength(); |
| } |
| } |
| |
| TcrMessagePut::TcrMessagePut( |
| DataOutput* dataOutput, const Region* region, |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, bool isDelta, |
| ThinClientBaseDM* connectionDM, bool isMetaRegion, |
| bool fullValueAfterDeltaFail, const char* regionName) { |
| m_request.reset(dataOutput); |
| // m_securityHeaderLength = 0; |
| m_isMetaRegion = isMetaRegion; |
| m_msgType = TcrMessage::PUT; |
| m_tcdm = connectionDM; |
| m_key = key; |
| m_regionName = region != nullptr ? region->getFullPath() : regionName; |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| |
| // TODO check the number of parts in this constructor. doubt because in PUT |
| // value can be nullptr also. |
| uint32_t numOfParts = 5; |
| if (aCallbackArgument != nullptr) { |
| ++numOfParts; |
| } |
| |
| numOfParts++; |
| |
| if (key == nullptr) { |
| throw IllegalArgumentException( |
| "key passed to the constructor can't be nullptr"); |
| } |
| |
| numOfParts++; |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeObjectPart(nullptr); // operation = null |
| writeIntPart(0); // flags = 0 |
| writeObjectPart(key); |
| writeObjectPart(CacheableBoolean::create(isDelta)); |
| writeObjectPart(value, isDelta); |
| writeEventIdPart(0, fullValueAfterDeltaFail); |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageReply::TcrMessageReply(bool decodeAll, |
| ThinClientBaseDM* connectionDM) { |
| m_msgType = TcrMessage::INVALID; |
| m_decodeAll = decodeAll; |
| m_tcdm = connectionDM; |
| |
| if (connectionDM != nullptr) isSecurityOn = connectionDM->isSecurityOn(); |
| } |
| |
| TcrMessagePing::TcrMessagePing(DataOutput* dataOutput, bool decodeAll) { |
| m_msgType = TcrMessage::PING; |
| m_decodeAll = decodeAll; |
| m_request.reset(dataOutput); |
| m_request->writeInt(m_msgType); |
| m_request->writeInt(static_cast<int32_t>( |
| 0)); // 17 is fixed message len ... PING only has a header. |
| m_request->writeInt(static_cast<int32_t>(0)); // Number of parts. |
| // int32_t txId = TcrMessage::m_transactionId++; |
| // Setting the txId to 0 for all ping message as it is not being used on the |
| // SERVER side or the |
| // client side. |
| m_request->writeInt(static_cast<int32_t>(0)); |
| m_request->write(static_cast<int8_t>(0)); // Early ack is '0'. |
| m_msgLength = g_headerLen; |
| m_txId = 0; |
| } |
| |
| TcrMessageCloseConnection::TcrMessageCloseConnection(DataOutput* dataOutput, |
| bool decodeAll) { |
| m_msgType = TcrMessage::CLOSE_CONNECTION; |
| m_decodeAll = decodeAll; |
| m_request.reset(dataOutput); |
| m_request->writeInt(m_msgType); |
| m_request->writeInt(static_cast<int32_t>(6)); |
| m_request->writeInt(static_cast<int32_t>(1)); // Number of parts. |
| // int32_t txId = TcrMessage::m_transactionId++; |
| m_request->writeInt(static_cast<int32_t>(0)); |
| m_request->write(static_cast<int8_t>(0)); // Early ack is '0'. |
| // last two parts are not used ... setting zero in both the parts. |
| m_request->writeInt(static_cast<int32_t>(1)); // len is 1 |
| m_request->write(static_cast<int8_t>(0)); // is obj is '0'. |
| // cast away constness here since we want to modify this |
| TcrMessage::m_keepAlive = const_cast<uint8_t*>(m_request->getCursor()); |
| m_request->write(static_cast<int8_t>(0)); // keepalive is '0'. |
| } |
| |
| TcrMessageClientMarker::TcrMessageClientMarker(DataOutput* dataOutput, |
| bool decodeAll) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::CLIENT_MARKER; |
| m_decodeAll = decodeAll; |
| } |
| |
| TcrMessageRegisterInterestList::TcrMessageRegisterInterestList( |
| DataOutput* dataOutput, const Region* region, |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable, |
| bool isCachingEnabled, bool receiveValues, |
| InterestResultPolicy interestPolicy, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::REGISTER_INTEREST_LIST; |
| m_tcdm = connectionDM; |
| m_keyList = &keys; |
| m_regionName = |
| region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_isDurable = isDurable; |
| m_receiveValues = receiveValues; |
| |
| writeHeader(m_msgType, 6); |
| |
| // Part 1 |
| writeRegionPart(m_regionName); |
| |
| // Part 2 |
| writeInterestResultPolicyPart(interestPolicy); |
| |
| // Part 3 |
| writeBytePart(isDurable ? 1 : 0); // keepalive |
| |
| // Part 4 |
| auto cal = CacheableArrayList::create(); |
| for (auto&& key : keys) { |
| if (key == nullptr) { |
| throw IllegalArgumentException( |
| "keys in the interest list cannot be nullptr"); |
| } |
| cal->push_back(key); |
| } |
| writeObjectPart(cal); |
| |
| // Part 5 |
| int8_t bytes[2]; |
| std::shared_ptr<CacheableBytes> byteArr = nullptr; |
| bytes[0] = receiveValues ? 0 : 1; // reveive values |
| byteArr = CacheableBytes::create(std::vector<int8_t>(bytes, bytes + 1)); |
| writeObjectPart(byteArr); |
| |
| // Part 6 |
| bytes[0] = isCachingEnabled ? 1 : 0; // region policy |
| bytes[1] = 0; // serialize values |
| byteArr = CacheableBytes::create(std::vector<int8_t>(bytes, bytes + 2)); |
| writeObjectPart(byteArr); |
| |
| writeMessageLength(); |
| m_interestPolicy = interestPolicy.ordinal; |
| } |
| |
| TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList( |
| DataOutput* dataOutput, const Region* region, |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable, |
| bool receiveValues, InterestResultPolicy interestPolicy, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::UNREGISTER_INTEREST_LIST; |
| m_tcdm = connectionDM; |
| m_keyList = &keys; |
| m_regionName = |
| region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); |
| m_region = region; |
| m_timeout = DEFAULT_TIMEOUT_SECONDS; |
| m_isDurable = isDurable; |
| m_receiveValues = receiveValues; |
| |
| auto numInItrestList = keys.size(); |
| assert(numInItrestList != 0); |
| uint32_t numOfParts = 2 + static_cast<uint32_t>(numInItrestList); |
| |
| numOfParts += 2; |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeBytePart(0); // isClosing |
| writeBytePart(isDurable ? 1 : 0); // keepalive |
| |
| writeIntPart(static_cast<int32_t>(numInItrestList)); |
| |
| for (uint32_t i = 0; i < numInItrestList; i++) { |
| if (keys[i] == nullptr) { |
| throw IllegalArgumentException( |
| "keys in the interest list cannot be nullptr"); |
| } |
| writeObjectPart(keys[i]); |
| } |
| |
| writeMessageLength(); |
| m_interestPolicy = interestPolicy.ordinal; |
| } |
| |
| TcrMessageCreateRegion::TcrMessageCreateRegion( |
| DataOutput* dataOutput, const std::string& str1, const std::string& str2, |
| bool isDurable, bool receiveValues, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::CREATE_REGION; |
| m_tcdm = connectionDM; |
| m_isDurable = isDurable; |
| m_receiveValues = receiveValues; |
| |
| uint32_t numOfParts = 2; |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(str1); // parent region name |
| writeRegionPart(str2); // region name |
| writeMessageLength(); |
| m_regionName = str2; |
| } |
| |
| TcrMessageRegisterInterest::TcrMessageRegisterInterest( |
| DataOutput* dataOutput, const std::string& str1, const std::string& str2, |
| InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled, |
| bool receiveValues, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::REGISTER_INTEREST; |
| m_tcdm = connectionDM; |
| m_isDurable = isDurable; |
| m_receiveValues = receiveValues; |
| |
| uint32_t numOfParts = 7; |
| |
| writeHeader(m_msgType, numOfParts); |
| |
| writeRegionPart(str1); // region name |
| writeIntPart(REGULAR_EXPRESSION); // InterestType |
| writeInterestResultPolicyPart(interestPolicy); // InterestResultPolicy |
| writeBytePart(isDurable ? 1 : 0); |
| writeRegionPart(str2); // regexp string |
| |
| int8_t bytes[2]; |
| std::shared_ptr<CacheableBytes> byteArr = nullptr; |
| bytes[0] = receiveValues ? 0 : 1; |
| byteArr = CacheableBytes::create(std::vector<int8_t>(bytes, bytes + 1)); |
| writeObjectPart(byteArr); |
| bytes[0] = isCachingEnabled ? 1 : 0; // region data policy |
| bytes[1] = 0; // serializevalues |
| byteArr = CacheableBytes::create(std::vector<int8_t>(bytes, bytes + 2)); |
| writeObjectPart(byteArr); |
| |
| writeMessageLength(); |
| m_regionName = str1; |
| m_regex = str2; |
| m_interestPolicy = interestPolicy.ordinal; |
| } |
| |
| TcrMessageUnregisterInterest::TcrMessageUnregisterInterest( |
| DataOutput* dataOutput, const std::string& str1, const std::string& str2, |
| InterestResultPolicy interestPolicy, bool isDurable, bool receiveValues, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::UNREGISTER_INTEREST; |
| m_tcdm = connectionDM; |
| m_isDurable = isDurable; |
| m_receiveValues = receiveValues; |
| |
| uint32_t numOfParts = 3; |
| numOfParts += 2; |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(str1); // region name |
| writeIntPart(REGULAR_EXPRESSION); // InterestType |
| writeRegionPart(str2); // regexp string |
| writeBytePart(0); // isClosing |
| writeBytePart(isDurable ? 1 : 0); // keepalive |
| writeMessageLength(); |
| m_regionName = str1; |
| m_regex = str2; |
| m_interestPolicy = interestPolicy.ordinal; |
| } |
| |
| TcrMessageTxSynchronization::TcrMessageTxSynchronization(DataOutput* dataOutput, |
| int ordinal, int txid, |
| int status) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::TX_SYNCHRONIZATION; |
| |
| writeHeader(m_msgType, ordinal == 1 ? 3 : 2); |
| writeIntPart(ordinal); |
| writeIntPart(txid); |
| if (ordinal == 1) { |
| writeIntPart(status); |
| } |
| |
| writeMessageLength(); |
| } |
| |
| TcrMessageClientReady::TcrMessageClientReady(DataOutput* dataOutput) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::CLIENT_READY; |
| |
| writeHeader(m_msgType, 1); |
| // the server expects at least 1 part, so writing a dummy |
| writeBytePart(0); |
| writeMessageLength(); |
| } |
| |
| TcrMessageCommit::TcrMessageCommit(DataOutput* dataOutput) { |
| m_msgType = TcrMessage::COMMIT; |
| m_request.reset(dataOutput); |
| |
| writeHeader(m_msgType, 1); |
| // the server expects at least 1 part, so writing a dummy |
| writeBytePart(0); |
| writeMessageLength(); |
| } |
| |
| TcrMessageRollback::TcrMessageRollback(DataOutput* dataOutput) { |
| m_msgType = TcrMessage::ROLLBACK; |
| m_request.reset(dataOutput); |
| |
| writeHeader(m_msgType, 1); |
| // the server expects at least 1 part, so writing a dummy |
| writeBytePart(0); |
| writeMessageLength(); |
| } |
| |
| TcrMessageTxFailover::TcrMessageTxFailover(DataOutput* dataOutput) { |
| m_msgType = TcrMessage::TX_FAILOVER; |
| m_request.reset(dataOutput); |
| |
| writeHeader(m_msgType, 1); |
| // the server expects at least 1 part, so writing a dummy |
| writeBytePart(0); |
| writeMessageLength(); |
| } |
| |
| // constructor for MAKE_PRIMARY message. |
| TcrMessageMakePrimary::TcrMessageMakePrimary(DataOutput* dataOutput, |
| bool processedMarker) { |
| m_msgType = TcrMessage::MAKE_PRIMARY; |
| m_request.reset(dataOutput); |
| |
| writeHeader(m_msgType, 1); |
| writeBytePart(processedMarker ? 1 : 0); // boolean processedMarker |
| writeMessageLength(); |
| } |
| |
| // constructor for PERIODIC_ACK of notified eventids |
| TcrMessagePeriodicAck::TcrMessagePeriodicAck( |
| DataOutput* dataOutput, const EventIdMapEntryList& entries) { |
| m_msgType = TcrMessage::PERIODIC_ACK; |
| m_request.reset(dataOutput); |
| |
| uint32_t numParts = static_cast<uint32_t>(entries.size()); |
| writeHeader(m_msgType, numParts); |
| for (EventIdMapEntryList::const_iterator entry = entries.begin(); |
| entry != entries.end(); ++entry) { |
| auto src = entry->first; |
| auto seq = entry->second; |
| auto eid = EventId::create(src->getMemId(), src->getMemIdLen(), |
| src->getThrId(), seq->getSeqNum()); |
| writeObjectPart(eid); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessagePutAll::TcrMessagePutAll( |
| DataOutput* dataOutput, const Region* region, const HashMapOfCacheable& map, |
| std::chrono::milliseconds messageResponsetimeout, |
| ThinClientBaseDM* connectionDM, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| m_tcdm = connectionDM; |
| m_regionName = region->getFullPath(); |
| m_region = region; |
| m_messageResponseTimeout = messageResponsetimeout; |
| m_request.reset(dataOutput); |
| |
| // TODO check the number of parts in this constructor. doubt because in PUT |
| // value can be nullptr also. |
| uint32_t numOfParts = 0; |
| // bool skipCallBacks = false; |
| |
| if (aCallbackArgument != nullptr) { |
| m_msgType = TcrMessage::PUT_ALL_WITH_CALLBACK; |
| numOfParts = 6 + static_cast<uint32_t>(map.size()) * 2; |
| // skipCallBacks = false; |
| } else { |
| m_msgType = TcrMessage::PUTALL; |
| numOfParts = 5 + static_cast<uint32_t>(map.size()) * 2; |
| // skipCallBacks = true; |
| } |
| |
| // numOfParts++; |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| numOfParts++; |
| } |
| |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeEventIdPart(static_cast<uint32_t>(map.size()) - 1); |
| // writeIntPart(skipCallBacks ? 0 : 1); |
| writeIntPart(0); |
| |
| // Client putAll requests now send a flags int as part #0. 1==region has |
| // datapolicy.EMPTY, 2==region has concurrency checks enabled. |
| // Version tags are not sent back if dp is EMPTY or concurrency |
| // checks are disabled. |
| int flags = 0; |
| if (!region->getAttributes().getCachingEnabled()) { |
| flags |= TcrMessage::m_flag_empty; |
| LOGDEBUG("TcrMessage::PUTALL datapolicy empty flags = %d ", flags); |
| } |
| if (region->getAttributes().getConcurrencyChecksEnabled()) { |
| flags |= TcrMessage::m_flag_concurrency_checks; |
| LOGDEBUG("TcrMessage::PUTALL ConcurrencyChecksEnabled flags = %d ", flags); |
| } |
| writeIntPart(flags); |
| |
| writeIntPart(static_cast<int32_t>(map.size())); |
| |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } |
| |
| for (const auto& iter : map) { |
| writeObjectPart(iter.first); |
| writeObjectPart(iter.second); |
| } |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| writeMillisecondsPart(m_messageResponseTimeout); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageRemoveAll::TcrMessageRemoveAll( |
| DataOutput* dataOutput, const Region* region, |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| ThinClientBaseDM* connectionDM) { |
| m_msgType = TcrMessage::REMOVE_ALL; |
| m_tcdm = connectionDM; |
| m_regionName = region->getFullPath(); |
| m_region = region; |
| m_request.reset(dataOutput); |
| |
| // TODO check the number of parts in this constructor. doubt because in PUT |
| // value can be nullptr also. |
| uint32_t numOfParts = 5 + static_cast<uint32_t>(keys.size()); |
| |
| if (m_messageResponseTimeout >= std::chrono::milliseconds::zero()) { |
| numOfParts++; |
| } |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(m_regionName); |
| writeEventIdPart(static_cast<int>(keys.size() - 1)); |
| |
| // Client removeall requests now send a flags int as part #0. 1==region has |
| // datapolicy.EMPTY, 2==region has concurrency checks enabled. |
| // Version tags are not sent back if dp is EMPTY or concurrency |
| // checks are disabled. |
| int flags = 0; |
| if (!region->getAttributes().getCachingEnabled()) { |
| flags |= TcrMessage::m_flag_empty; |
| LOGDEBUG("TcrMessage::REMOVE_ALL datapolicy empty flags = %d ", flags); |
| } |
| if (region->getAttributes().getConcurrencyChecksEnabled()) { |
| flags |= TcrMessage::m_flag_concurrency_checks; |
| LOGDEBUG("TcrMessage::REMOVE_ALL ConcurrencyChecksEnabled flags = %d ", |
| flags); |
| } |
| writeIntPart(flags); |
| writeObjectPart(aCallbackArgument); |
| writeIntPart(static_cast<int32_t>(keys.size())); |
| |
| for (const auto& key : keys) { |
| writeObjectPart(key); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageGetAll::TcrMessageGetAll( |
| DataOutput* dataOutput, const Region* region, |
| const std::vector<std::shared_ptr<CacheableKey>>* keys, |
| ThinClientBaseDM* connectionDM, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| m_msgType = TcrMessage::GET_ALL_70; |
| m_tcdm = connectionDM; |
| m_keyList = keys; |
| m_callbackArgument = aCallbackArgument; |
| m_regionName = region->getFullPath(); |
| m_region = region; |
| m_request.reset(dataOutput); |
| |
| /*CacheableObjectArrayPtr keyArr = nullptr; |
| if (keys != nullptr) { |
| keyArr = CacheableObjectArray::create(); |
| for (int32_t index = 0; index < keys->size(); ++index) { |
| keyArr->push_back(keys->operator[](index)); |
| } |
| }*/ |
| if (m_callbackArgument != nullptr) { |
| m_msgType = TcrMessage::GET_ALL_WITH_CALLBACK; |
| } else { |
| m_msgType = TcrMessage::GET_ALL_70; |
| } |
| |
| writeHeader(m_msgType, 3); |
| writeRegionPart(m_regionName); |
| /*writeHeader(m_msgType, 2); |
| writeRegionPart(regionName); |
| writeObjectPart(keyArr); |
| writeMessageLength();*/ |
| } |
| |
| void TcrMessage::InitializeGetallMsg( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| /*CacheableObjectArrayPtr keyArr = nullptr; |
| if (m_keyList != nullptr) { |
| keyArr = CacheableObjectArray::create(); |
| for (int32_t index = 0; index < m_keyList->size(); ++index) { |
| keyArr->push_back(m_keyList->operator[](index)); |
| } |
| }*/ |
| // LOGINFO(" in InitializeGetallMsg %s ", m_regionName.c_str()); |
| // writeHeader(m_msgType, 2); |
| // writeRegionPart(m_regionName); |
| writeObjectPart(nullptr, false, false, m_keyList); // will do manually |
| if (aCallbackArgument != nullptr) { |
| writeObjectPart(aCallbackArgument); |
| } else { |
| writeIntPart(0); |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageExecuteCq::TcrMessageExecuteCq(DataOutput* dataOutput, |
| const std::string& str1, |
| const std::string& str2, CqState state, |
| bool isDurable, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::EXECUTECQ_MSG_TYPE; |
| m_tcdm = connectionDM; |
| m_isDurable = isDurable; |
| |
| uint32_t numOfParts = 5; |
| writeHeader(m_msgType, numOfParts); |
| writeStringPart(str1); // cqName |
| writeStringPart(str2); // query string |
| |
| writeIntPart(static_cast<int32_t>(state)); // cq state |
| writeBytePart(isDurable ? 1 : 0); |
| // hard-coding region data policy to 1 |
| // This Part will be removed when server-side changes are made to remove |
| // CQ dependency on region data policy. After the changes, set numOfParts |
| // to 4 (currently 5). |
| writeBytePart(1); |
| writeMessageLength(); |
| m_regionName = str1; |
| m_regex = str2; |
| } |
| |
| TcrMessageExecuteCqWithIr::TcrMessageExecuteCqWithIr( |
| DataOutput* dataOutput, const std::string& str1, const std::string& str2, |
| CqState state, bool isDurable, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE; |
| m_tcdm = connectionDM; |
| m_isDurable = isDurable; |
| |
| uint32_t numOfParts = 5; |
| writeHeader(m_msgType, numOfParts); |
| writeStringPart(str1); // cqName |
| writeStringPart(str2); // query string |
| |
| writeIntPart(static_cast<int32_t>(state)); // cq state |
| writeBytePart(isDurable ? 1 : 0); |
| // hard-coding region data policy to 1 |
| // This Part will be removed when server-side changes are made to remove |
| // CQ dependency on region data policy. After the changes, set numOfParts |
| // to 4 (currently 5). |
| writeBytePart(1); |
| writeMessageLength(); |
| m_regionName = str1; |
| m_regex = str2; |
| } |
| |
| TcrMessageExecuteFunction::TcrMessageExecuteFunction( |
| DataOutput* dataOutput, const std::string& funcName, |
| const std::shared_ptr<Cacheable>& args, uint8_t getResult, |
| ThinClientBaseDM* connectionDM, std::chrono::milliseconds timeout) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::EXECUTE_FUNCTION; |
| m_tcdm = connectionDM; |
| m_hasResult = getResult; |
| |
| uint32_t numOfParts = 3; |
| writeHeader(m_msgType, numOfParts); |
| writeByteAndTimeOutPart(getResult, timeout); |
| writeRegionPart(funcName); // function name string |
| writeObjectPart(args); |
| writeMessageLength(); |
| } |
| |
| TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction( |
| DataOutput* dataOutput, const std::string& funcName, const Region* region, |
| const std::shared_ptr<Cacheable>& args, |
| std::shared_ptr<CacheableVector> routingObj, uint8_t getResult, |
| std::shared_ptr<CacheableHashSet> failedNodes, |
| std::chrono::milliseconds timeout, ThinClientBaseDM* connectionDM, |
| int8_t reExecute) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION; |
| m_tcdm = connectionDM; |
| m_regionName = |
| region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); |
| m_region = region; |
| m_hasResult = getResult; |
| |
| if (routingObj && routingObj->size() == 1) { |
| LOGDEBUG("setting up key"); |
| m_key = std::dynamic_pointer_cast<CacheableKey>(routingObj->at(0)); |
| } |
| |
| uint32_t numOfParts = |
| 6 + (!routingObj ? 0 : static_cast<uint32_t>(routingObj->size())); |
| numOfParts += |
| 2; // for the FunctionHA isReExecute and removedNodesSize parts. |
| if (failedNodes != nullptr) { |
| numOfParts++; |
| } |
| writeHeader(m_msgType, numOfParts); |
| writeByteAndTimeOutPart(getResult, timeout); |
| writeRegionPart(m_regionName); |
| writeRegionPart(funcName); // function name string |
| writeObjectPart(args); |
| // klug for MemberMappedArgs |
| writeObjectPart(nullptr); |
| writeBytePart(reExecute); // FunctionHA isReExecute = false |
| if (routingObj) { |
| writeIntPart(static_cast<int32_t>(routingObj->size())); |
| for (const auto& value : *routingObj) { |
| writeObjectPart(value); |
| } |
| } else { |
| writeIntPart(0); |
| } |
| if (failedNodes) { |
| writeIntPart(static_cast<int32_t>(failedNodes->size())); |
| writeObjectPart(failedNodes); |
| } else { |
| writeIntPart(0); // FunctionHA removedNodesSize = 0 |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageExecuteRegionFunctionSingleHop:: |
| TcrMessageExecuteRegionFunctionSingleHop( |
| DataOutput* dataOutput, const std::string& funcName, |
| const Region* region, const std::shared_ptr<Cacheable>& args, |
| std::shared_ptr<CacheableHashSet> routingObj, uint8_t getResult, |
| std::shared_ptr<CacheableHashSet> failedNodes, bool allBuckets, |
| std::chrono::milliseconds timeout, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP; |
| m_tcdm = connectionDM; |
| m_regionName = |
| region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); |
| m_region = region; |
| m_hasResult = getResult; |
| |
| uint32_t numOfParts = |
| 6 + (routingObj ? static_cast<int32_t>(routingObj->size()) : 0); |
| numOfParts += |
| 2; // for the FunctionHA isReExecute and removedNodesSize parts. |
| if (failedNodes != nullptr) { |
| numOfParts++; |
| } |
| writeHeader(m_msgType, numOfParts); |
| writeByteAndTimeOutPart(getResult, timeout); |
| writeRegionPart(m_regionName); |
| writeRegionPart(funcName); // function name string |
| writeObjectPart(args); |
| // klug for MemberMappedArgs |
| writeObjectPart(nullptr); |
| writeBytePart(allBuckets ? 1 : 0); |
| if (routingObj) { |
| writeIntPart(static_cast<int32_t>(routingObj->size())); |
| if (allBuckets) { |
| LOGDEBUG("All Buckets so putting IntPart for buckets = %zu", |
| routingObj->size()); |
| for (const auto& itr : *routingObj) { |
| writeIntPart(std::dynamic_pointer_cast<CacheableInt32>(itr)->value()); |
| } |
| } else { |
| LOGDEBUG("putting keys as withFilter called, routing Keys size = %zu", |
| routingObj->size()); |
| for (const auto& itr : *routingObj) { |
| writeObjectPart(itr); |
| } |
| } |
| } else { |
| writeIntPart(0); |
| } |
| if (failedNodes) { |
| writeIntPart(static_cast<int32_t>(failedNodes->size())); |
| writeObjectPart(failedNodes); |
| } else { |
| writeIntPart(0); // FunctionHA removedNodesSize = 0 |
| } |
| writeMessageLength(); |
| } |
| |
| TcrMessageGetClientPartitionAttributes::TcrMessageGetClientPartitionAttributes( |
| DataOutput* dataOutput, const char* regionName) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES; |
| writeHeader(m_msgType, 1); |
| writeRegionPart(regionName); |
| writeMessageLength(); |
| } |
| |
| TcrMessageGetClientPrMetadata::TcrMessageGetClientPrMetadata( |
| DataOutput* dataOutput, const char* regionName) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::GET_CLIENT_PR_METADATA; |
| writeHeader(m_msgType, 1); |
| writeRegionPart(regionName); |
| writeMessageLength(); |
| } |
| |
| TcrMessageSize::TcrMessageSize(DataOutput* dataOutput, const char* regionName) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::SIZE; |
| writeHeader(m_msgType, 1); |
| writeRegionPart(regionName); |
| writeMessageLength(); |
| } |
| |
| TcrMessageUserCredential::TcrMessageUserCredential( |
| DataOutput* dataOutput, std::shared_ptr<Properties> creds, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::USER_CREDENTIAL_MESSAGE; |
| m_tcdm = connectionDM; |
| |
| /* |
| * First part will be connection-id ( may be in encrypted form) to avoid |
| * replay attack |
| * Second part will be credentails (may be in encrypted form) |
| */ |
| m_creds = creds; |
| /*LOGDEBUG("Tcrmessage sending creds to server"); |
| writeHeader(msgType, numOfParts); |
| writeObjectPart(creds); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage addsp = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength())->value().c_str());*/ |
| } |
| |
| TcrMessageRemoveUserAuth::TcrMessageRemoveUserAuth( |
| DataOutput* dataOutput, bool keepAlive, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| |
| m_msgType = TcrMessage::REMOVE_USER_AUTH; |
| m_tcdm = connectionDM; |
| LOGDEBUG("Tcrmessage sending REMOVE_USER_AUTH message to server"); |
| writeHeader(m_msgType, 1); |
| // adding dummy part as server has check for numberofparts > 0 |
| int8_t dummy = 0; |
| if (keepAlive) dummy = 1; |
| auto cbp = CacheableBytes::create(std::vector<int8_t>(&dummy, &dummy + 1)); |
| writeObjectPart(cbp, false); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage REMOVE_USER_AUTH = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| void TcrMessage::createUserCredentialMessage(TcrConnection* conn) { |
| m_request->reset(); |
| m_isSecurityHeaderAdded = false; |
| writeHeader(m_msgType, 1); |
| |
| auto dOut = m_tcdm->getConnectionManager().getCacheImpl()->createDataOutput( |
| getPool()); |
| |
| if (m_creds != nullptr) m_creds->toData(dOut); |
| |
| auto credBytes = CacheableBytes::create(std::vector<int8_t>( |
| dOut.getBuffer(), dOut.getBuffer() + dOut.getBufferLength())); |
| auto encryptBytes = conn->encryptBytes(credBytes); |
| writeObjectPart(encryptBytes); |
| |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage::createUserCredentialMessage msg = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| void TcrMessage::addSecurityPart(int64_t connectionId, int64_t unique_id, |
| TcrConnection* conn) { |
| LOGDEBUG("TcrMessage::addSecurityPart m_isSecurityHeaderAdded = %d ", |
| m_isSecurityHeaderAdded); |
| if (m_isSecurityHeaderAdded) { |
| m_request->rewindCursor(m_securityHeaderLength); |
| writeMessageLength(); |
| m_securityHeaderLength = 0; |
| m_isSecurityHeaderAdded = false; |
| } |
| m_isSecurityHeaderAdded = true; |
| LOGDEBUG("addSecurityPart( , ) "); |
| auto dOutput = |
| m_tcdm->getConnectionManager().getCacheImpl()->createDataOutput( |
| getPool()); |
| |
| dOutput.writeInt(connectionId); |
| dOutput.writeInt(unique_id); |
| |
| auto bytes = CacheableBytes::create(std::vector<int8_t>( |
| dOutput.getBuffer(), dOutput.getBuffer() + dOutput.getBufferLength())); |
| |
| auto encryptBytes = conn->encryptBytes(bytes); |
| |
| LOGDEBUG("TcrMessage::addSecurityPart [%p] length = %" PRId32 |
| ", encrypted ID = %s ", |
| conn, encryptBytes->length(), |
| Utils::convertBytesToString(encryptBytes->value().data(), |
| encryptBytes->length()) |
| .c_str()); |
| |
| writeObjectPart(encryptBytes); |
| writeMessageLength(); |
| m_securityHeaderLength = 4 + 1 + encryptBytes->length(); |
| } |
| |
| void TcrMessage::addSecurityPart(int64_t connectionId, TcrConnection* conn) { |
| LOGDEBUG("TcrMessage::addSecurityPart m_isSecurityHeaderAdded = %d ", |
| m_isSecurityHeaderAdded); |
| if (m_isSecurityHeaderAdded) { |
| m_request->rewindCursor(m_securityHeaderLength); |
| writeMessageLength(); |
| m_securityHeaderLength = 0; |
| m_isSecurityHeaderAdded = false; |
| } |
| m_isSecurityHeaderAdded = true; |
| LOGDEBUG("TcrMessage::addSecurityPart only connid"); |
| auto dOutput = |
| m_tcdm->getConnectionManager().getCacheImpl()->createDataOutput( |
| getPool()); |
| |
| dOutput.writeInt(connectionId); |
| |
| auto bytes = CacheableBytes::create(std::vector<int8_t>( |
| dOutput.getBuffer(), dOutput.getBuffer() + dOutput.getBufferLength())); |
| |
| auto encryptBytes = conn->encryptBytes(bytes); |
| |
| writeObjectPart(encryptBytes); |
| writeMessageLength(); |
| m_securityHeaderLength = 4 + 1 + encryptBytes->length(); |
| LOGDEBUG("TcrMessage addspCC = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| TcrMessageRequestEventValue::TcrMessageRequestEventValue( |
| DataOutput* dataOutput, std::shared_ptr<EventId> eventId) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::REQUEST_EVENT_VALUE; |
| |
| uint32_t numOfParts = 1; |
| writeHeader(m_msgType, numOfParts); |
| writeObjectPart(eventId); |
| writeMessageLength(); |
| } |
| |
| TcrMessageGetPdxIdForType::TcrMessageGetPdxIdForType( |
| DataOutput* dataOutput, const std::shared_ptr<Cacheable>& pdxType, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::GET_PDX_ID_FOR_TYPE; |
| m_tcdm = connectionDM; |
| |
| LOGDEBUG("Tcrmessage sending GET_PDX_ID_FOR_TYPE message to server"); |
| writeHeader(m_msgType, 1); |
| writeObjectPart(pdxType, false, true); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage GET_PDX_ID_FOR_TYPE = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| TcrMessageAddPdxType::TcrMessageAddPdxType( |
| DataOutput* dataOutput, const std::shared_ptr<Cacheable>& pdxType, |
| ThinClientBaseDM* connectionDM, int32_t pdxTypeId) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::ADD_PDX_TYPE; |
| m_tcdm = connectionDM; |
| |
| LOGDEBUG("Tcrmessage sending ADD_PDX_TYPE message to server"); |
| writeHeader(m_msgType, 2); |
| writeObjectPart(pdxType, false, true); |
| writeIntPart(pdxTypeId); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage ADD_PDX_TYPE id = %d = %s ", pdxTypeId, |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| TcrMessageGetPdxIdForEnum::TcrMessageGetPdxIdForEnum( |
| DataOutput* dataOutput, const std::shared_ptr<Cacheable>& pdxType, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::GET_PDX_ID_FOR_ENUM; |
| m_tcdm = connectionDM; |
| |
| LOGDEBUG("Tcrmessage sending GET_PDX_ID_FOR_ENUM message to server"); |
| writeHeader(m_msgType, 1); |
| writeObjectPart(pdxType, false, false); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage GET_PDX_ID_FOR_ENUM = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| TcrMessageAddPdxEnum::TcrMessageAddPdxEnum( |
| DataOutput* dataOutput, const std::shared_ptr<Cacheable>& pdxType, |
| ThinClientBaseDM* connectionDM, int32_t pdxTypeId) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::ADD_PDX_ENUM; |
| m_tcdm = connectionDM; |
| |
| LOGDEBUG("Tcrmessage sending ADD_PDX_ENUM message to server"); |
| writeHeader(m_msgType, 2); |
| writeObjectPart(pdxType, false, false); |
| writeIntPart(pdxTypeId); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage ADD_PDX_ENUM id = %d = %s ", pdxTypeId, |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| TcrMessageGetPdxTypeById::TcrMessageGetPdxTypeById( |
| DataOutput* dataOutput, int32_t typeId, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::GET_PDX_TYPE_BY_ID; |
| m_tcdm = connectionDM; |
| |
| LOGDEBUG("Tcrmessage sending GET_PDX_TYPE_BY_ID message to server"); |
| writeHeader(m_msgType, 1); |
| m_request->writeInt(4); |
| m_request->writeBoolean(false); |
| m_request->writeInt(typeId); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage GET_PDX_TYPE_BY_ID = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| TcrMessageGetPdxEnumById::TcrMessageGetPdxEnumById( |
| DataOutput* dataOutput, int32_t typeId, ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::GET_PDX_ENUM_BY_ID; |
| m_tcdm = connectionDM; |
| |
| LOGDEBUG("Tcrmessage sending GET_PDX_ENUM_BY_ID message to server"); |
| writeHeader(m_msgType, 1); |
| m_request->writeInt(4); |
| m_request->writeBoolean(false); |
| m_request->writeInt(typeId); |
| writeMessageLength(); |
| LOGDEBUG("TcrMessage GET_PDX_ENUM_BY_ID = %s ", |
| Utils::convertBytesToString(m_request->getBuffer(), |
| m_request->getBufferLength()) |
| .c_str()); |
| } |
| |
| TcrMessageGetFunctionAttributes::TcrMessageGetFunctionAttributes( |
| DataOutput* dataOutput, const std::string& funcName, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::GET_FUNCTION_ATTRIBUTES; |
| m_tcdm = connectionDM; |
| |
| uint32_t numOfParts = 1; |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(funcName); // function name string |
| writeMessageLength(); |
| } |
| |
| TcrMessageKeySet::TcrMessageKeySet(DataOutput* dataOutput, |
| const std::string& funcName, |
| ThinClientBaseDM* connectionDM) { |
| m_request.reset(dataOutput); |
| m_msgType = TcrMessage::KEY_SET; |
| m_tcdm = connectionDM; |
| |
| uint32_t numOfParts = 1; |
| writeHeader(m_msgType, numOfParts); |
| writeRegionPart(funcName); // function name string |
| writeMessageLength(); |
| } |
| |
| void TcrMessage::setData(const char* bytearray, int32_t len, uint16_t memId, |
| const SerializationRegistry& serializationRegistry, |
| MemberListForVersionStamp& memberListForVersionStamp) { |
| if (m_request == nullptr) { |
| m_request = std::unique_ptr<DataOutput>(new DataOutput( |
| m_tcdm->getConnectionManager().getCacheImpl()->createDataOutput( |
| getPool()))); |
| } |
| if (bytearray) { |
| DeleteArray<const char> delByteArr(bytearray); |
| handleByteArrayResponse(bytearray, len, memId, serializationRegistry, |
| memberListForVersionStamp); |
| } |
| } |
| |
| TcrMessage::~TcrMessage() { |
| _GEODE_SAFE_DELETE(m_cqs); |
| /* adongre |
| * CID 29167: Non-array delete for scalars (DELETE_ARRAY) |
| * Coverity - II |
| */ |
| // _GEODE_SAFE_DELETE( m_deltaBytes ); |
| _GEODE_SAFE_DELETE_ARRAY(m_deltaBytes); |
| } |
| |
| const std::string& TcrMessage::getRegionName() const { return m_regionName; } |
| |
| Region* TcrMessage::getRegion() const { return const_cast<Region*>(m_region); } |
| |
| int32_t TcrMessage::getMessageType() const { return m_msgType; } |
| |
| void TcrMessage::setMessageType(int32_t msgType) { m_msgType = msgType; } |
| |
| void TcrMessage::setMessageTypeRequest(int32_t msgType) { |
| m_msgTypeRequest = msgType; |
| } |
| int32_t TcrMessage::getMessageTypeRequest() const { return m_msgTypeRequest; } |
| |
| const std::map<std::string, int>* TcrMessage::getCqs() const { return m_cqs; } |
| std::shared_ptr<CacheableKey> TcrMessage::getKey() const { return m_key; } |
| |
| const std::shared_ptr<CacheableKey>& TcrMessage::getKeyRef() const { |
| return m_key; |
| } |
| std::shared_ptr<Cacheable> TcrMessage::getValue() const { return m_value; } |
| |
| const std::shared_ptr<Cacheable>& TcrMessage::getValueRef() const { |
| return m_value; |
| } |
| std::shared_ptr<Cacheable> TcrMessage::getCallbackArgument() const { |
| return m_callbackArgument; |
| } |
| |
| const std::shared_ptr<Cacheable>& TcrMessage::getCallbackArgumentRef() const { |
| return m_callbackArgument; |
| } |
| |
| const char* TcrMessage::getMsgData() const { |
| return reinterpret_cast<const char*>(m_request->getBuffer()); |
| } |
| |
| const char* TcrMessage::getMsgHeader() const { |
| return reinterpret_cast<const char*>(m_request->getBuffer()); |
| } |
| |
| const char* TcrMessage::getMsgBody() const { |
| return reinterpret_cast<const char*>(m_request->getBuffer() + g_headerLen); |
| } |
| |
| size_t TcrMessage::getMsgLength() const { return m_request->getBufferLength(); } |
| |
| size_t TcrMessage::getMsgBodyLength() const { |
| return m_request->getBufferLength() - g_headerLen; |
| } |
| std::shared_ptr<EventId> TcrMessage::getEventId() const { return m_eventid; } |
| |
| int32_t TcrMessage::getTransId() const { return m_txId; } |
| |
| void TcrMessage::setTransId(int32_t txId) { m_txId = txId; } |
| |
| std::chrono::milliseconds TcrMessage::getTimeout() const { return m_timeout; } |
| |
| void TcrMessage::setTimeout(std::chrono::milliseconds timeout) { |
| m_timeout = timeout; |
| } |
| |
| void TcrMessage::skipParts(DataInput& input, int32_t numParts) { |
| while (numParts > 0) { |
| numParts--; |
| int32_t partLen = input.readInt32(); |
| LOGDEBUG("TcrMessage::skipParts partLen= %d ", partLen); |
| input.advanceCursor(partLen + 1); // Skip the whole part including "isObj" |
| } |
| } |
| |
| void TcrMessage::readEventIdPart(DataInput& input, bool skip, int32_t parts) { |
| // skip requested number of parts |
| if (skip) { |
| skipParts(input, parts); |
| } |
| |
| // read the eventid part |
| // read and ignore length |
| input.readInt32(); |
| // read and ignore isObj |
| input.read(); |
| |
| m_eventid = std::dynamic_pointer_cast<EventId>(input.readObject()); |
| } |
| std::shared_ptr<DSMemberForVersionStamp> TcrMessage::readDSMember( |
| apache::geode::client::DataInput& input) { |
| uint8_t typeidLen = input.read(); |
| if (typeidLen == 1) { |
| auto typeidofMember = static_cast<DSCode>(input.read()); |
| if (typeidofMember != DSCode::InternalDistributedMember) { |
| throw Exception( |
| "Reading DSMember. Expecting type id 92 for " |
| "InternalDistributedMember. "); |
| } |
| |
| auto memId = |
| std::shared_ptr<ClientProxyMembershipID>(new ClientProxyMembershipID()); |
| memId->fromData(input); |
| return std::shared_ptr<DSMemberForVersionStamp>(memId); |
| } else if (typeidLen == 2) { |
| auto typeidofMember = input.readInt16(); |
| if (typeidofMember != static_cast<int16_t>(DSFid::DiskStoreId)) { |
| throw Exception( |
| "Reading DSMember. Expecting type id 2133 for DiskStoreId. "); |
| } |
| DiskStoreId* diskStore = new DiskStoreId(); |
| diskStore->fromData(input); |
| return std::shared_ptr<DSMemberForVersionStamp>(diskStore); |
| } else { |
| throw Exception( |
| "Reading DSMember. Expecting type id length as either one or two " |
| "byte."); |
| } |
| } |
| void TcrMessage::readHashMapForGCVersions( |
| apache::geode::client::DataInput& input, |
| std::shared_ptr<CacheableHashMap>& value) { |
| uint8_t hashmaptypeid = input.read(); |
| if (hashmaptypeid != static_cast<int8_t>(DSCode::CacheableHashMap)) { |
| throw Exception( |
| "Reading HashMap For GC versions. Expecting type id of hash map. "); |
| } |
| int32_t len = input.readArrayLength(); |
| |
| if (len > 0) { |
| std::shared_ptr<CacheableKey> key; |
| std::shared_ptr<Cacheable> val; |
| for (int32_t index = 0; index < len; index++) { |
| key = readDSMember(input); |
| // read and ignore versionType |
| input.read(); |
| |
| auto valVersion = CacheableInt64::create(input.readInt64()); |
| auto keyPtr = std::dynamic_pointer_cast<CacheableKey>(key); |
| auto valVersionPtr = std::dynamic_pointer_cast<Cacheable>(valVersion); |
| |
| if (value) { |
| value->emplace(keyPtr, valVersionPtr); |
| } else { |
| throw Exception( |
| "Inserting values in HashMap For GC versions. value must not be " |
| "nullptr. "); |
| } |
| } |
| } |
| } |
| |
| void TcrMessage::readHashSetForGCVersions( |
| apache::geode::client::DataInput& input, |
| std::shared_ptr<CacheableHashSet>& value) { |
| auto hashsettypeid = input.read(); |
| if (hashsettypeid != static_cast<int8_t>(DSCode::CacheableHashSet)) { |
| throw Exception( |
| "Reading HashSet For GC versions. Expecting type id of hash set. "); |
| } |
| |
| auto len = input.readArrayLength(); |
| for (decltype(len) index = 0; index < len; index++) { |
| auto keyPtr = std::dynamic_pointer_cast<CacheableKey>(input.readObject()); |
| value->insert(keyPtr); |
| } |
| } |
| |
| bool TcrMessageHelper::readExceptionPart(TcrMessage& msg, DataInput& input, |
| uint8_t isLastChunk) { |
| return msg.readExceptionPart(input, isLastChunk); |
| } |
| |
| void TcrMessageHelper::skipParts(TcrMessage& msg, DataInput& input, |
| int32_t numParts) { |
| msg.skipParts(input, numParts); |
| } |
| |
| TcrMessageHelper::ChunkObjectType TcrMessageHelper::readChunkPartHeader( |
| TcrMessage& msg, DataInput& input, DSCode expectedFirstType, |
| int32_t expectedPartType, const char* methodName, uint32_t& partLen, |
| uint8_t isLastChunk) { |
| partLen = input.readInt32(); |
| const auto isObj = input.readBoolean(); |
| |
| if (partLen == 0) { |
| // special null object is case for scalar query result |
| return ChunkObjectType::NULL_OBJECT; |
| } else if (!isObj) { |
| // otherwise we're currently always expecting an object |
| char exMsg[256]; |
| std::snprintf(exMsg, sizeof(exMsg), |
| "TcrMessageHelper::readChunkPartHeader: " |
| "%s: part is not object", |
| methodName); |
| LOGDEBUG("%s ", exMsg); |
| return ChunkObjectType::EXCEPTION; |
| } |
| |
| auto rawByte = input.read(); |
| auto partType = static_cast<DSCode>(rawByte); |
| auto compId = static_cast<int32_t>(partType); |
| |
| // ugly hack to check for exception chunk |
| if (partType == DSCode::JavaSerializable) { |
| input.reset(); |
| if (TcrMessageHelper::readExceptionPart(msg, input, isLastChunk)) { |
| msg.setMessageType(TcrMessage::EXCEPTION); |
| return ChunkObjectType::EXCEPTION; |
| } else { |
| char exMsg[256]; |
| std::snprintf(exMsg, sizeof(exMsg), |
| "TcrMessageHelper::readChunkPartHeader: %s: cannot handle " |
| "java serializable object from server", |
| methodName); |
| throw MessageException(exMsg); |
| } |
| } else if (partType == DSCode::NullObj) { |
| // special null object is case for scalar query result |
| return ChunkObjectType::NULL_OBJECT; |
| } |
| |
| // TODO enum - wtf? |
| if (expectedFirstType > DSCode::FixedIDDefault) { |
| if (partType != expectedFirstType) { |
| char exMsg[256]; |
| std::snprintf(exMsg, sizeof(exMsg), |
| "TcrMessageHelper::readChunkPartHeader: " |
| "%s: got unhandled object class = %" PRId8, |
| methodName, static_cast<int8_t>(partType)); |
| throw MessageException(exMsg); |
| } |
| // This is for GETALL |
| if (expectedFirstType == DSCode::FixedIDShort) { |
| compId = input.readInt16(); |
| } // This is for QUERY or REGISTER INTEREST. |
| else if (expectedFirstType == DSCode::FixedIDByte) { |
| compId = input.read(); |
| } |
| } |
| if (compId != expectedPartType) { |
| char exMsg[256]; |
| std::snprintf(exMsg, sizeof(exMsg), |
| "TcrMessageHelper::readChunkPartHeader: " |
| "%s: got unhandled object type = %d, expected = %d, raw = %d", |
| methodName, compId, expectedPartType, rawByte); |
| throw MessageException(exMsg); |
| } |
| return ChunkObjectType::OBJECT; |
| } |
| |
| TcrMessageHelper::ChunkObjectType TcrMessageHelper::readChunkPartHeader( |
| TcrMessage& msg, DataInput& input, const char* methodName, |
| uint32_t& partLen, uint8_t isLastChunk) { |
| partLen = input.readInt32(); |
| const auto isObj = input.readBoolean(); |
| |
| if (partLen == 0) { |
| // special null object is case for scalar query result |
| return ChunkObjectType::NULL_OBJECT; |
| } else if (!isObj) { |
| // otherwise we're currently always expecting an object |
| char exMsg[256]; |
| std::snprintf(exMsg, 255, |
| "TcrMessageHelper::readChunkPartHeader: " |
| "%s: part is not object", |
| methodName); |
| throw MessageException(exMsg); |
| } |
| |
| const auto partType = static_cast<const DSCode>(input.read()); |
| // ugly hack to check for exception chunk |
| if (partType == DSCode::JavaSerializable) { |
| input.reset(); |
| if (TcrMessageHelper::readExceptionPart(msg, input, isLastChunk)) { |
| msg.setMessageType(TcrMessage::EXCEPTION); |
| return ChunkObjectType::EXCEPTION; |
| } else { |
| char exMsg[256]; |
| std::snprintf(exMsg, 255, |
| "TcrMessageHelper::readChunkPartHeader: %s: cannot handle " |
| "java serializable object from server", |
| methodName); |
| throw MessageException(exMsg); |
| } |
| } else if (partType == DSCode::NullObj) { |
| // special null object is case for scalar query result |
| return ChunkObjectType::NULL_OBJECT; |
| } |
| return ChunkObjectType::OBJECT; |
| } |
| |
| #pragma error_messages(default, SEC_UNINITIALIZED_MEM_READ) |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |