GEODE-6826: Refactor chunk processing (#490)
* Move struct definition out of class method
* Move timeout calculation into helper method
* eliminate unnecessary variables
* Move response header reading code into separate method
* Move chunk header reading code into separate method
* Move chunk body reading code into separate method
* switch from raw buffer to std::vector in chunk body code
* do loop now becomes a while loop
Co-authored-by: Steve Sienkowski <ssienkowski@pivotal.io>
diff --git a/cppcache/src/TcrChunkedContext.hpp b/cppcache/src/TcrChunkedContext.hpp
index ab8614f..17b42c6 100644
--- a/cppcache/src/TcrChunkedContext.hpp
+++ b/cppcache/src/TcrChunkedContext.hpp
@@ -121,37 +121,37 @@
*/
class TcrChunkedContext {
private:
- const uint8_t* m_bytes;
+ const std::vector<uint8_t> m_chunk;
const int32_t m_len;
const uint8_t m_isLastChunkWithSecurity;
const CacheImpl* m_cache;
TcrChunkedResult* m_result;
public:
- inline TcrChunkedContext(const uint8_t* bytes, int32_t len,
+ inline TcrChunkedContext(const std::vector<uint8_t> chunk, int32_t len,
TcrChunkedResult* result,
uint8_t isLastChunkWithSecurity,
const CacheImpl* cacheImpl)
- : m_bytes(bytes),
+ : m_chunk(chunk),
m_len(len),
m_isLastChunkWithSecurity(isLastChunkWithSecurity),
m_cache(cacheImpl),
m_result(result) {}
- inline ~TcrChunkedContext() { _GEODE_SAFE_DELETE_ARRAY(m_bytes); }
+ inline ~TcrChunkedContext() = default;
- inline const uint8_t* getBytes() const { return m_bytes; }
+ inline const uint8_t* getBytes() const { return m_chunk.data(); }
- inline int32_t getLen() const { return m_len; }
+ inline size_t getLen() const { return m_chunk.size(); }
void handleChunk(bool inSameThread) {
- if (m_bytes == nullptr) {
+ if (m_chunk.empty()) {
// this is the last chunk for some set of chunks
m_result->finalize(inSameThread);
} else if (!m_result->exceptionOccurred()) {
try {
- m_result->fireHandleChunk(m_bytes, m_len, m_isLastChunkWithSecurity,
- m_cache);
+ m_result->fireHandleChunk(m_chunk.data(), m_len,
+ m_isLastChunkWithSecurity, m_cache);
} catch (Exception& ex) {
LOGERROR("HandleChunk error message %s, name = %s", ex.what(),
ex.getName().c_str());
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 941e8b2..d7a0f57 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -17,7 +17,7 @@
#include "TcrConnection.hpp"
-#include <memory.h>
+#include <cinttypes>
#include <ace/INET_Addr.h>
#include <ace/OS.h>
@@ -42,6 +42,8 @@
namespace client {
const int HEADER_LENGTH = 17;
+const int CHUNK_HEADER_LENGTH = 5;
+const int8_t LAST_CHUNK_MASK = 0x1;
const int64_t INITIAL_CONNECTION_ID = 26739;
#define throwException(ex) \
@@ -49,6 +51,21 @@
LOGFINEST(ex.getName() + ": " + ex.getMessage()); \
throw ex; \
}
+
+struct FinalizeProcessChunk {
+ private:
+ TcrMessage& m_reply;
+ uint16_t m_endpointMemId;
+
+ public:
+ FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointMemId)
+ : m_reply(reply), m_endpointMemId(endpointMemId) {}
+ ~FinalizeProcessChunk() noexcept(false) {
+ // Enqueue a nullptr chunk indicating a wait for processing to complete.
+ m_reply.processChunk(std::vector<uint8_t>(), 0, m_endpointMemId);
+ }
+};
+
bool TcrConnection::InitTcrConnection(
TcrEndpoint* endpointObj, const char* endpoint,
synchronized_set<std::unordered_set<uint16_t>>& ports,
@@ -122,7 +139,6 @@
m_port = m_conn->getPort();
ports.insert(m_port);
} else {
- // add the local ports to message
auto&& lock = ports.make_lock();
handShakeMsg.writeInt(static_cast<int32_t>(ports.size()));
for (const auto& port : ports) {
@@ -896,146 +912,36 @@
return fullMessage;
}
-void TcrConnection::readMessageChunked(
- TcrMessageReply& reply, std::chrono::microseconds receiveTimeoutSec,
- bool doHeaderTimeoutRetries) {
- const int HDR_LEN = 5;
- const int HDR_LEN_12 = 12;
- uint8_t msg_header[HDR_LEN_12 + HDR_LEN];
- ConnErrType error;
-
- std::chrono::microseconds headerTimeout = receiveTimeoutSec;
- if (doHeaderTimeoutRetries &&
- receiveTimeoutSec == DEFAULT_READ_TIMEOUT_SECS) {
- headerTimeout = DEFAULT_READ_TIMEOUT_SECS * DEFAULT_TIMEOUT_RETRIES;
- }
+void TcrConnection::readMessageChunked(TcrMessageReply& reply,
+ std::chrono::microseconds receiveTimeout,
+ bool doHeaderTimeoutRetries) {
+ auto headerTimeout =
+ calculateHeaderTimeout(receiveTimeout, doHeaderTimeoutRetries);
LOGFINER(
"TcrConnection::readMessageChunked: receiving reply from "
"endpoint %s",
m_endpoint);
- error = receiveData(reinterpret_cast<char*>(msg_header), HDR_LEN_12 + HDR_LEN,
- headerTimeout, true, false);
- if (error != CONN_NOERR) {
- if (error & CONN_TIMEOUT) {
- throwException(TimeoutException(
- "TcrConnection::readMessageChunked: "
- "connection timed out while receiving message header"));
- } else {
- throwException(GeodeIOException(
- "TcrConnection::readMessageChunked: "
- "connection failure while receiving message header"));
- }
- }
+ auto responseHeader = readResponseHeader(headerTimeout);
- LOGDEBUG(
- "TcrConnection::readMessageChunked: received header from "
- "endpoint %s; bytes: %s",
- m_endpoint, Utils::convertBytesToString(msg_header, HDR_LEN_12).c_str());
-
- auto input = m_connectionManager->getCacheImpl()->createDataInput(msg_header,
- HDR_LEN_12);
- int32_t msgType = input.readInt32();
- reply.setMessageType(msgType);
- int32_t txId;
- int32_t numOfParts = input.readInt32();
- LOGDEBUG("TcrConnection::readMessageChunked numberof parts = %d ",
- numOfParts);
- // input->advanceCursor(4);
- txId = input.readInt32();
- reply.setTransId(txId);
-
- // bool isLastChunk = false;
- uint8_t isLastChunk = 0x0;
-
- int chunkNum = 0;
+ reply.setMessageType(responseHeader.messageType);
+ reply.setTransId(responseHeader.transactionId);
// Initialize the chunk processing
reply.startProcessChunk(m_chunksProcessSema);
- // indicate an end to chunk processing and wait for processing
+ // indicate an end to chunk processing and wait for processing
// to end even if reading the chunks fails in middle
- struct FinalizeProcessChunk {
- private:
- TcrMessage& m_reply;
- uint16_t m_endpointmemId;
+ FinalizeProcessChunk endProcessChunk(reply,
+ m_endpointObj->getDistributedMemberID());
- public:
- FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointmemId)
- : m_reply(reply), m_endpointmemId(endpointmemId) {}
- ~FinalizeProcessChunk() noexcept(false) {
- // Enqueue a nullptr chunk indicating a wait for processing to complete.
- m_reply.processChunk(nullptr, 0, m_endpointmemId);
- }
- } endProcessChunk(reply, m_endpointObj->getDistributedMemberID());
-
+ auto header = responseHeader.header;
try {
- bool first = true;
- do {
- // uint8_t chunk_header[HDR_LEN];
- if (!first) {
- error = receiveData(reinterpret_cast<char*>(msg_header + HDR_LEN_12),
- HDR_LEN, headerTimeout, true, false);
- if (error != CONN_NOERR) {
- if (error & CONN_TIMEOUT) {
- throwException(TimeoutException(
- "TcrConnection::readMessageChunked: "
- "connection timed out while receiving chunk header"));
- } else {
- throwException(GeodeIOException(
- "TcrConnection::readMessageChunked: "
- "connection failure while receiving chunk header"));
- }
- }
- } else {
- first = false;
- }
- ++chunkNum;
-
- LOGDEBUG(
- "TcrConnection::readMessageChunked: received chunk header %d "
- "from endpoint %s; bytes: %s",
- chunkNum, m_endpoint,
- Utils::convertBytesToString((msg_header + HDR_LEN_12), HDR_LEN)
-
- .c_str());
-
- auto input = m_connectionManager->getCacheImpl()->createDataInput(
- msg_header + HDR_LEN_12, HDR_LEN);
- int32_t chunkLen;
- chunkLen = input.readInt32();
- // check that chunk length is valid.
- isLastChunk = input.read();
-
- uint8_t* chunk_body;
- _GEODE_NEW(chunk_body, uint8_t[chunkLen]);
- error = receiveData(reinterpret_cast<char*>(chunk_body), chunkLen,
- receiveTimeoutSec, true, false);
- if (error != CONN_NOERR) {
- delete[] chunk_body;
- if (error & CONN_TIMEOUT) {
- throwException(TimeoutException(
- "TcrConnection::readMessageChunked: "
- "connection timed out while receiving chunk body"));
- } else {
- throwException(GeodeIOException(
- "TcrConnection::readMessageChunked: "
- "connection failure while receiving chunk body"));
- }
- }
-
- LOGDEBUG(
- "TcrConnection::readMessageChunked: received chunk body %d "
- "from endpoint %s; bytes: %s",
- chunkNum, m_endpoint,
- Utils::convertBytesToString(chunk_body, chunkLen).c_str());
- // Process the chunk; the actual processing is done by a separate thread
- // ThinClientBaseDM::m_chunkProcessor.
-
- reply.processChunk(chunk_body, chunkLen,
- m_endpointObj->getDistributedMemberID(), isLastChunk);
- } while (!(isLastChunk & 0x1));
+ while (
+ processChunk(reply, receiveTimeout, header.chunkLength, header.flags)) {
+ header = readChunkHeader(headerTimeout);
+ }
} catch (const Exception&) {
auto ex = reply.getChunkedResultHandler()->getException();
LOGDEBUG("Found existing exception ", ex->what());
@@ -1049,6 +955,138 @@
m_endpoint);
}
+std::chrono::microseconds TcrConnection::calculateHeaderTimeout(
+ std::chrono::microseconds receiveTimeout, bool retry) {
+ auto headerTimeout = receiveTimeout;
+ if (retry && receiveTimeout == DEFAULT_READ_TIMEOUT_SECS) {
+ headerTimeout *= DEFAULT_TIMEOUT_RETRIES;
+ }
+ return headerTimeout;
+}
+
+chunkedResponseHeader TcrConnection::readResponseHeader(
+ std::chrono::microseconds timeout) {
+ uint8_t receiveBuffer[HEADER_LENGTH];
+ chunkedResponseHeader header;
+
+ auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
+ HEADER_LENGTH, timeout, true, false);
+ if (error != CONN_NOERR) {
+ if (error & CONN_TIMEOUT) {
+ throwException(TimeoutException(
+ "TcrConnection::readResponseHeader: "
+ "connection timed out while receiving message header"));
+ } else {
+ throwException(GeodeIOException(
+ "TcrConnection::readResponseHeader: "
+ "connection failure while receiving message header"));
+ }
+ }
+
+ LOGDEBUG(
+ "TcrConnection::readResponseHeader: received header from "
+ "endpoint %s; bytes: %s",
+ m_endpoint,
+ Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str());
+
+ auto input = m_connectionManager->getCacheImpl()->createDataInput(
+ receiveBuffer, HEADER_LENGTH);
+ header.messageType = input.readInt32();
+ header.numberOfParts = input.readInt32();
+ header.transactionId = input.readInt32();
+ header.header.chunkLength = input.readInt32();
+ header.header.flags = input.read();
+ LOGDEBUG(
+ "TcrConnection::readResponseHeader: "
+ "messageType=%" PRId32 ", numberOfParts=%" PRId32
+ ", transactionId=%" PRId32 ", chunkLength=%" PRId32
+ ", lastChunkAndSecurityFlags=0x%" PRIx8,
+ header.messageType, header.numberOfParts, header.transactionId,
+ header.header.chunkLength, header.header.flags);
+
+ return header;
+} // namespace client
+
+chunkHeader TcrConnection::readChunkHeader(std::chrono::microseconds timeout) {
+ uint8_t receiveBuffer[CHUNK_HEADER_LENGTH];
+ chunkHeader header;
+
+ auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
+ CHUNK_HEADER_LENGTH, timeout, true, false);
+ if (error != CONN_NOERR) {
+ if (error & CONN_TIMEOUT) {
+ throwException(TimeoutException(
+ "TcrConnection::readChunkHeader: "
+ "connection timed out while receiving message header"));
+ } else {
+ throwException(GeodeIOException(
+ "TcrConnection::readChunkHeader: "
+ "connection failure while receiving message header"));
+ }
+ }
+
+ LOGDEBUG(
+ "TcrConnection::readChunkHeader: received header from "
+ "endpoint %s; bytes: %s",
+ m_endpoint,
+ Utils::convertBytesToString(receiveBuffer, CHUNK_HEADER_LENGTH).c_str());
+
+ auto input = m_connectionManager->getCacheImpl()->createDataInput(
+ receiveBuffer, CHUNK_HEADER_LENGTH);
+ header.chunkLength = input.readInt32();
+ header.flags = input.read();
+ LOGDEBUG(
+ "TcrConnection::readChunkHeader: "
+ ", chunkLen=%" PRId32 ", lastChunkAndSecurityFlags=0x%" PRIx8,
+ header.chunkLength, header.flags);
+
+ return header;
+}
+
+std::vector<uint8_t> TcrConnection::readChunkBody(
+ std::chrono::microseconds timeout, int32_t chunkLength) {
+ std::vector<uint8_t> chunkBody(chunkLength);
+ auto error = receiveData(reinterpret_cast<char*>(chunkBody.data()),
+ chunkLength, timeout, true, false);
+ if (error != CONN_NOERR) {
+ if (error & CONN_TIMEOUT) {
+ throwException(
+ TimeoutException("TcrConnection::readChunkBody: "
+ "connection timed out while receiving chunk body"));
+ } else {
+ throwException(
+ GeodeIOException("TcrConnection::readChunkBody: "
+ "connection failure while receiving chunk body"));
+ }
+ }
+
+ LOGDEBUG(
+ "TcrConnection::readChunkBody: received chunk body from endpoint "
+ "%s; bytes: %s",
+ m_endpoint,
+ Utils::convertBytesToString(chunkBody.data(), chunkLength).c_str());
+ return chunkBody;
+}
+
+bool TcrConnection::processChunk(TcrMessageReply& reply,
+ std::chrono::microseconds timeout,
+ int32_t chunkLength,
+ int8_t lastChunkAndSecurityFlags) {
+ // NOTE: this buffer is allocated by readChunkBody, and reply.processChunk
+ // takes ownership, so we don't delete it here on failure
+ std::vector<uint8_t> chunkBody = readChunkBody(timeout, chunkLength);
+
+ // Process the chunk; the actual processing is done by a separate thread
+ // ThinClientBaseDM::m_chunkProcessor.
+ reply.processChunk(chunkBody, chunkLength,
+ m_endpointObj->getDistributedMemberID(),
+ lastChunkAndSecurityFlags);
+ // Return boolean indicating whether or not there are more chunks, i.e.
+ // the *inverse* of the flag indicating this is the last chunk. It's a
+ // little confusing here, but makes calling code clearer.
+ return (lastChunkAndSecurityFlags & LAST_CHUNK_MASK) ? false : true;
+}
+
void TcrConnection::close() {
TcrMessage* closeMsg = TcrMessage::getCloseConnMessage(
m_poolDM->getConnectionManager().getCacheImpl());
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index b026062..4374a96 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -66,6 +66,18 @@
namespace geode {
namespace client {
+struct chunkHeader {
+ int32_t chunkLength;
+ int8_t flags;
+};
+
+struct chunkedResponseHeader {
+ int32_t messageType;
+ int32_t numberOfParts;
+ int32_t transactionId;
+ chunkHeader header;
+};
+
enum ConnErrType {
CONN_NOERR = 0x0,
CONN_NODATA = 0x1,
@@ -247,13 +259,13 @@
* connection and sets the reply message
* parameter.
* @param reply response message
- * @param receiveTimeoutSec read timeout in sec
+ * @param receiveTimeout read timeout
* @param doHeaderTimeoutRetries retry when header receive times out
* @exception GeodeIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens during read
*/
void readMessageChunked(TcrMessageReply& reply,
- std::chrono::microseconds receiveTimeoutSec,
+ std::chrono::microseconds receiveTimeout,
bool doHeaderTimeoutRetries);
/**
@@ -318,6 +330,20 @@
int64_t connectionId;
const TcrConnectionManager* m_connectionManager;
DiffieHellman* m_dh;
+
+ std::chrono::microseconds calculateHeaderTimeout(
+ std::chrono::microseconds receiveTimeout, bool retry);
+
+ chunkedResponseHeader readResponseHeader(std::chrono::microseconds timeout);
+
+ chunkHeader readChunkHeader(std::chrono::microseconds timeout);
+
+ std::vector<uint8_t> readChunkBody(std::chrono::microseconds timeout,
+ int32_t chunkLength);
+
+ bool processChunk(TcrMessageReply& reply, std::chrono::microseconds timeout,
+ int32_t chunkLength, int8_t lastChunkAndSecurityFlags);
+
/**
* To read Intantiator message(which meant for java client), here we are
* ignoring it
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index efd1f18..220f00c 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -713,7 +713,7 @@
}
}
-void TcrMessage::processChunk(const uint8_t* bytes, int32_t len,
+void TcrMessage::processChunk(const std::vector<uint8_t>& chunk, int32_t len,
uint16_t endpointmemId,
const uint8_t isLastChunkAndisSecurityHeader) {
// TODO: see if security header is there
@@ -732,7 +732,7 @@
switch (m_msgType) {
case TcrMessage::REPLY: {
LOGDEBUG("processChunk - got reply for request %d", m_msgTypeRequest);
- chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
+ chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
break;
}
case TcrMessage::RESPONSE: {
@@ -748,12 +748,12 @@
break;
} else if (m_msgTypeRequest == TcrMessage::PUTALL ||
m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) {
- TcrChunkedContext* chunk = new TcrChunkedContext(
- bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
+ TcrChunkedContext* chunkedContext = new TcrChunkedContext(
+ chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
m_tcdm->getConnectionManager().getCacheImpl());
m_chunkedResult->setEndpointMemId(endpointmemId);
- m_tcdm->queueChunk(chunk);
- if (bytes == nullptr) {
+ m_tcdm->queueChunk(chunkedContext);
+ if (chunk.empty()) {
// last chunk -- wait for processing of all the chunks to complete
m_chunkedResult->waitFinalize();
auto ex = m_chunkedResult->getException();
@@ -772,12 +772,12 @@
case TcrMessage::RESPONSE_FROM_PRIMARY: {
if (m_chunkedResult != nullptr) {
LOGDEBUG("tcrmessage in case22 ");
- TcrChunkedContext* chunk = new TcrChunkedContext(
- bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
+ TcrChunkedContext* chunkedContext = new TcrChunkedContext(
+ chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
m_tcdm->getConnectionManager().getCacheImpl());
m_chunkedResult->setEndpointMemId(endpointmemId);
- m_tcdm->queueChunk(chunk);
- if (bytes == nullptr) {
+ m_tcdm->queueChunk(chunkedContext);
+ if (chunk.empty()) {
// last chunk -- wait for processing of all the chunks to complete
m_chunkedResult->waitFinalize();
// Throw any exception during processing here.
@@ -795,9 +795,8 @@
} else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType ||
TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType ||
TcrMessage::GET_ALL_DATA_ERROR == m_msgType) {
- if (bytes != nullptr) {
- chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
- _GEODE_SAFE_DELETE_ARRAY(bytes);
+ if (!chunk.empty()) {
+ chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
}
}
break;
@@ -806,7 +805,7 @@
// error
case EXECUTE_FUNCTION_ERROR:
case EXECUTE_REGION_FUNCTION_ERROR: {
- if (bytes != nullptr) {
+ if (!chunk.empty()) {
// DeleteArray<const uint8_t> delChunk(bytes);
// DataInput input(bytes, len);
// TODO: this not send two part...
@@ -814,17 +813,15 @@
// readExceptionPart(input, false);
// readSecureObjectPart(input, false, true,
// isLastChunkAndisSecurityHeader );
- chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
- _GEODE_SAFE_DELETE_ARRAY(bytes);
+ chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
}
break;
}
case TcrMessage::EXCEPTION: {
- if (bytes != nullptr) {
- DeleteArray<const uint8_t> delChunk(bytes);
+ if (!chunk.empty()) {
auto input =
m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
- bytes, len);
+ chunk.data(), len);
readExceptionPart(input, isLastChunkAndisSecurityHeader);
readSecureObjectPart(input, false, true,
isLastChunkAndisSecurityHeader);
@@ -833,42 +830,34 @@
}
case TcrMessage::RESPONSE_FROM_SECONDARY: {
// TODO: how many parts
- chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
- if (bytes != nullptr) {
- DeleteArray<const uint8_t> delChunk(bytes);
+ chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
+ if (chunk.size()) {
LOGFINEST("processChunk - got response from secondary, ignoring.");
}
break;
}
case TcrMessage::PUT_DATA_ERROR: {
- chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
- if (nullptr != bytes) {
+ chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
+ if (!chunk.empty()) {
auto input =
m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
- bytes, len);
+ chunk.data(), len);
auto errorString = readStringPart(input);
if (!errorString.empty()) {
setThreadLocalExceptionMessage(errorString.c_str());
}
-
- _GEODE_SAFE_DELETE_ARRAY(bytes);
}
break;
}
case TcrMessage::GET_ALL_DATA_ERROR: {
- chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
- if (bytes != nullptr) {
- _GEODE_SAFE_DELETE_ARRAY(bytes);
- }
+ chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
break;
}
default: {
// TODO: how many parts what should we do here
- if (bytes != nullptr) {
- _GEODE_SAFE_DELETE_ARRAY(bytes);
- } else {
+ if (chunk.empty()) {
LOGWARN(
"Got unhandled message type %d while processing response, possible "
"serialization mismatch",
@@ -889,13 +878,14 @@
return nullptr;
}
-void TcrMessage::chunkSecurityHeader(int skipPart, const uint8_t* bytes,
+void TcrMessage::chunkSecurityHeader(int skipPart,
+ const std::vector<uint8_t> bytes,
int32_t len,
uint8_t isLastChunkAndSecurityHeader) {
LOGDEBUG("TcrMessage::chunkSecurityHeader:: skipParts = %d", skipPart);
if ((isLastChunkAndSecurityHeader & 0x3) == 0x3) {
auto di = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
- bytes, len);
+ bytes.data(), len);
skipParts(di, skipPart);
readSecureObjectPart(di, false, true, isLastChunkAndSecurityHeader);
}
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index 46665c0..7244d75 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -221,7 +221,7 @@
void startProcessChunk(ACE_Semaphore& finalizeSema);
// nullptr chunk means that this is the last chunk
- void processChunk(const uint8_t* chunk, int32_t chunkLen,
+ void processChunk(const std::vector<uint8_t>& chunk, int32_t chunkLen,
uint16_t endpointmemId,
const uint8_t isLastChunkAndisSecurityHeader = 0x00);
/* For creating a region on the java server */
@@ -569,8 +569,8 @@
void writeMillisecondsPart(std::chrono::milliseconds millis);
void writeByteAndTimeOutPart(uint8_t byteValue,
std::chrono::milliseconds timeout);
- void chunkSecurityHeader(int skipParts, const uint8_t* bytes, int32_t len,
- uint8_t isLastChunkAndSecurityHeader);
+ void chunkSecurityHeader(int skipParts, const std::vector<uint8_t> bytes,
+ int32_t len, uint8_t isLastChunkAndSecurityHeader);
void readEventIdPart(DataInput& input, bool skip = false,
int32_t parts = 1); // skip num parts then read eventid