GEODE-6826: Refactor chunk processing (#490)

* Move struct definition out of class method
* Move timeout calculation into helper method
* eliminate unnecessary variables
* Move response header reading code into separate method
* Move chunk header reading code into separate method
* Move chunk body reading code into separate method
* switch from raw buffer to std::vector in chunk body code
* do loop now becomes a while loop

Co-authored-by: Steve Sienkowski <ssienkowski@pivotal.io>
diff --git a/cppcache/src/TcrChunkedContext.hpp b/cppcache/src/TcrChunkedContext.hpp
index ab8614f..17b42c6 100644
--- a/cppcache/src/TcrChunkedContext.hpp
+++ b/cppcache/src/TcrChunkedContext.hpp
@@ -121,37 +121,37 @@
  */
 class TcrChunkedContext {
  private:
-  const uint8_t* m_bytes;
+  const std::vector<uint8_t> m_chunk;
   const int32_t m_len;
   const uint8_t m_isLastChunkWithSecurity;
   const CacheImpl* m_cache;
   TcrChunkedResult* m_result;
 
  public:
-  inline TcrChunkedContext(const uint8_t* bytes, int32_t len,
+  inline TcrChunkedContext(const std::vector<uint8_t> chunk, int32_t len,
                            TcrChunkedResult* result,
                            uint8_t isLastChunkWithSecurity,
                            const CacheImpl* cacheImpl)
-      : m_bytes(bytes),
+      : m_chunk(chunk),
         m_len(len),
         m_isLastChunkWithSecurity(isLastChunkWithSecurity),
         m_cache(cacheImpl),
         m_result(result) {}
 
-  inline ~TcrChunkedContext() { _GEODE_SAFE_DELETE_ARRAY(m_bytes); }
+  inline ~TcrChunkedContext() = default;
 
-  inline const uint8_t* getBytes() const { return m_bytes; }
+  inline const uint8_t* getBytes() const { return m_chunk.data(); }
 
-  inline int32_t getLen() const { return m_len; }
+  inline size_t getLen() const { return m_chunk.size(); }
 
   void handleChunk(bool inSameThread) {
-    if (m_bytes == nullptr) {
+    if (m_chunk.empty()) {
       // this is the last chunk for some set of chunks
       m_result->finalize(inSameThread);
     } else if (!m_result->exceptionOccurred()) {
       try {
-        m_result->fireHandleChunk(m_bytes, m_len, m_isLastChunkWithSecurity,
-                                  m_cache);
+        m_result->fireHandleChunk(m_chunk.data(), m_len,
+                                  m_isLastChunkWithSecurity, m_cache);
       } catch (Exception& ex) {
         LOGERROR("HandleChunk error message %s, name = %s", ex.what(),
                  ex.getName().c_str());
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 941e8b2..d7a0f57 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -17,7 +17,7 @@
 
 #include "TcrConnection.hpp"
 
-#include <memory.h>
+#include <cinttypes>
 
 #include <ace/INET_Addr.h>
 #include <ace/OS.h>
@@ -42,6 +42,8 @@
 namespace client {
 
 const int HEADER_LENGTH = 17;
+const int CHUNK_HEADER_LENGTH = 5;
+const int8_t LAST_CHUNK_MASK = 0x1;
 const int64_t INITIAL_CONNECTION_ID = 26739;
 
 #define throwException(ex)                            \
@@ -49,6 +51,21 @@
     LOGFINEST(ex.getName() + ": " + ex.getMessage()); \
     throw ex;                                         \
   }
+
+struct FinalizeProcessChunk {
+ private:
+  TcrMessage& m_reply;
+  uint16_t m_endpointMemId;
+
+ public:
+  FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointMemId)
+      : m_reply(reply), m_endpointMemId(endpointMemId) {}
+  ~FinalizeProcessChunk() noexcept(false) {
+    // Enqueue a nullptr chunk indicating a wait for processing to complete.
+    m_reply.processChunk(std::vector<uint8_t>(), 0, m_endpointMemId);
+  }
+};
+
 bool TcrConnection::InitTcrConnection(
     TcrEndpoint* endpointObj, const char* endpoint,
     synchronized_set<std::unordered_set<uint16_t>>& ports,
@@ -122,7 +139,6 @@
     m_port = m_conn->getPort();
     ports.insert(m_port);
   } else {
-    // add the local ports to message
     auto&& lock = ports.make_lock();
     handShakeMsg.writeInt(static_cast<int32_t>(ports.size()));
     for (const auto& port : ports) {
@@ -896,146 +912,36 @@
   return fullMessage;
 }
 
-void TcrConnection::readMessageChunked(
-    TcrMessageReply& reply, std::chrono::microseconds receiveTimeoutSec,
-    bool doHeaderTimeoutRetries) {
-  const int HDR_LEN = 5;
-  const int HDR_LEN_12 = 12;
-  uint8_t msg_header[HDR_LEN_12 + HDR_LEN];
-  ConnErrType error;
-
-  std::chrono::microseconds headerTimeout = receiveTimeoutSec;
-  if (doHeaderTimeoutRetries &&
-      receiveTimeoutSec == DEFAULT_READ_TIMEOUT_SECS) {
-    headerTimeout = DEFAULT_READ_TIMEOUT_SECS * DEFAULT_TIMEOUT_RETRIES;
-  }
+void TcrConnection::readMessageChunked(TcrMessageReply& reply,
+                                       std::chrono::microseconds receiveTimeout,
+                                       bool doHeaderTimeoutRetries) {
+  auto headerTimeout =
+      calculateHeaderTimeout(receiveTimeout, doHeaderTimeoutRetries);
 
   LOGFINER(
       "TcrConnection::readMessageChunked: receiving reply from "
       "endpoint %s",
       m_endpoint);
 
-  error = receiveData(reinterpret_cast<char*>(msg_header), HDR_LEN_12 + HDR_LEN,
-                      headerTimeout, true, false);
-  if (error != CONN_NOERR) {
-    if (error & CONN_TIMEOUT) {
-      throwException(TimeoutException(
-          "TcrConnection::readMessageChunked: "
-          "connection timed out while receiving message header"));
-    } else {
-      throwException(GeodeIOException(
-          "TcrConnection::readMessageChunked: "
-          "connection failure while receiving message header"));
-    }
-  }
+  auto responseHeader = readResponseHeader(headerTimeout);
 
-  LOGDEBUG(
-      "TcrConnection::readMessageChunked: received header from "
-      "endpoint %s; bytes: %s",
-      m_endpoint, Utils::convertBytesToString(msg_header, HDR_LEN_12).c_str());
-
-  auto input = m_connectionManager->getCacheImpl()->createDataInput(msg_header,
-                                                                    HDR_LEN_12);
-  int32_t msgType = input.readInt32();
-  reply.setMessageType(msgType);
-  int32_t txId;
-  int32_t numOfParts = input.readInt32();
-  LOGDEBUG("TcrConnection::readMessageChunked numberof parts = %d ",
-           numOfParts);
-  // input->advanceCursor(4);
-  txId = input.readInt32();
-  reply.setTransId(txId);
-
-  // bool isLastChunk = false;
-  uint8_t isLastChunk = 0x0;
-
-  int chunkNum = 0;
+  reply.setMessageType(responseHeader.messageType);
+  reply.setTransId(responseHeader.transactionId);
 
   // Initialize the chunk processing
   reply.startProcessChunk(m_chunksProcessSema);
 
-  //  indicate an end to chunk processing and wait for processing
+  // indicate an end to chunk processing and wait for processing
   // to end even if reading the chunks fails in middle
-  struct FinalizeProcessChunk {
-   private:
-    TcrMessage& m_reply;
-    uint16_t m_endpointmemId;
+  FinalizeProcessChunk endProcessChunk(reply,
+                                       m_endpointObj->getDistributedMemberID());
 
-   public:
-    FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointmemId)
-        : m_reply(reply), m_endpointmemId(endpointmemId) {}
-    ~FinalizeProcessChunk() noexcept(false) {
-      // Enqueue a nullptr chunk indicating a wait for processing to complete.
-      m_reply.processChunk(nullptr, 0, m_endpointmemId);
-    }
-  } endProcessChunk(reply, m_endpointObj->getDistributedMemberID());
-
+  auto header = responseHeader.header;
   try {
-    bool first = true;
-    do {
-      // uint8_t chunk_header[HDR_LEN];
-      if (!first) {
-        error = receiveData(reinterpret_cast<char*>(msg_header + HDR_LEN_12),
-                            HDR_LEN, headerTimeout, true, false);
-        if (error != CONN_NOERR) {
-          if (error & CONN_TIMEOUT) {
-            throwException(TimeoutException(
-                "TcrConnection::readMessageChunked: "
-                "connection timed out while receiving chunk header"));
-          } else {
-            throwException(GeodeIOException(
-                "TcrConnection::readMessageChunked: "
-                "connection failure while receiving chunk header"));
-          }
-        }
-      } else {
-        first = false;
-      }
-      ++chunkNum;
-
-      LOGDEBUG(
-          "TcrConnection::readMessageChunked: received chunk header %d "
-          "from endpoint %s; bytes: %s",
-          chunkNum, m_endpoint,
-          Utils::convertBytesToString((msg_header + HDR_LEN_12), HDR_LEN)
-
-              .c_str());
-
-      auto input = m_connectionManager->getCacheImpl()->createDataInput(
-          msg_header + HDR_LEN_12, HDR_LEN);
-      int32_t chunkLen;
-      chunkLen = input.readInt32();
-      //  check that chunk length is valid.
-      isLastChunk = input.read();
-
-      uint8_t* chunk_body;
-      _GEODE_NEW(chunk_body, uint8_t[chunkLen]);
-      error = receiveData(reinterpret_cast<char*>(chunk_body), chunkLen,
-                          receiveTimeoutSec, true, false);
-      if (error != CONN_NOERR) {
-        delete[] chunk_body;
-        if (error & CONN_TIMEOUT) {
-          throwException(TimeoutException(
-              "TcrConnection::readMessageChunked: "
-              "connection timed out while receiving chunk body"));
-        } else {
-          throwException(GeodeIOException(
-              "TcrConnection::readMessageChunked: "
-              "connection failure while receiving chunk body"));
-        }
-      }
-
-      LOGDEBUG(
-          "TcrConnection::readMessageChunked: received chunk body %d "
-          "from endpoint %s; bytes: %s",
-          chunkNum, m_endpoint,
-          Utils::convertBytesToString(chunk_body, chunkLen).c_str());
-      // Process the chunk; the actual processing is done by a separate thread
-      // ThinClientBaseDM::m_chunkProcessor.
-
-      reply.processChunk(chunk_body, chunkLen,
-                         m_endpointObj->getDistributedMemberID(), isLastChunk);
-    } while (!(isLastChunk & 0x1));
+    while (
+        processChunk(reply, receiveTimeout, header.chunkLength, header.flags)) {
+      header = readChunkHeader(headerTimeout);
+    }
   } catch (const Exception&) {
     auto ex = reply.getChunkedResultHandler()->getException();
     LOGDEBUG("Found existing exception ", ex->what());
@@ -1049,6 +955,138 @@
       m_endpoint);
 }
 
+std::chrono::microseconds TcrConnection::calculateHeaderTimeout(
+    std::chrono::microseconds receiveTimeout, bool retry) {
+  auto headerTimeout = receiveTimeout;
+  if (retry && receiveTimeout == DEFAULT_READ_TIMEOUT_SECS) {
+    headerTimeout *= DEFAULT_TIMEOUT_RETRIES;
+  }
+  return headerTimeout;
+}
+
+chunkedResponseHeader TcrConnection::readResponseHeader(
+    std::chrono::microseconds timeout) {
+  uint8_t receiveBuffer[HEADER_LENGTH];
+  chunkedResponseHeader header;
+
+  auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
+                           HEADER_LENGTH, timeout, true, false);
+  if (error != CONN_NOERR) {
+    if (error & CONN_TIMEOUT) {
+      throwException(TimeoutException(
+          "TcrConnection::readResponseHeader: "
+          "connection timed out while receiving message header"));
+    } else {
+      throwException(GeodeIOException(
+          "TcrConnection::readResponseHeader: "
+          "connection failure while receiving message header"));
+    }
+  }
+
+  LOGDEBUG(
+      "TcrConnection::readResponseHeader: received header from "
+      "endpoint %s; bytes: %s",
+      m_endpoint,
+      Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str());
+
+  auto input = m_connectionManager->getCacheImpl()->createDataInput(
+      receiveBuffer, HEADER_LENGTH);
+  header.messageType = input.readInt32();
+  header.numberOfParts = input.readInt32();
+  header.transactionId = input.readInt32();
+  header.header.chunkLength = input.readInt32();
+  header.header.flags = input.read();
+  LOGDEBUG(
+      "TcrConnection::readResponseHeader: "
+      "messageType=%" PRId32 ", numberOfParts=%" PRId32
+      ", transactionId=%" PRId32 ", chunkLength=%" PRId32
+      ", lastChunkAndSecurityFlags=0x%" PRIx8,
+      header.messageType, header.numberOfParts, header.transactionId,
+      header.header.chunkLength, header.header.flags);
+
+  return header;
+}  // namespace client
+
+chunkHeader TcrConnection::readChunkHeader(std::chrono::microseconds timeout) {
+  uint8_t receiveBuffer[CHUNK_HEADER_LENGTH];
+  chunkHeader header;
+
+  auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
+                           CHUNK_HEADER_LENGTH, timeout, true, false);
+  if (error != CONN_NOERR) {
+    if (error & CONN_TIMEOUT) {
+      throwException(TimeoutException(
+          "TcrConnection::readChunkHeader: "
+          "connection timed out while receiving message header"));
+    } else {
+      throwException(GeodeIOException(
+          "TcrConnection::readChunkHeader: "
+          "connection failure while receiving message header"));
+    }
+  }
+
+  LOGDEBUG(
+      "TcrConnection::readChunkHeader: received header from "
+      "endpoint %s; bytes: %s",
+      m_endpoint,
+      Utils::convertBytesToString(receiveBuffer, CHUNK_HEADER_LENGTH).c_str());
+
+  auto input = m_connectionManager->getCacheImpl()->createDataInput(
+      receiveBuffer, CHUNK_HEADER_LENGTH);
+  header.chunkLength = input.readInt32();
+  header.flags = input.read();
+  LOGDEBUG(
+      "TcrConnection::readChunkHeader: "
+      ", chunkLen=%" PRId32 ", lastChunkAndSecurityFlags=0x%" PRIx8,
+      header.chunkLength, header.flags);
+
+  return header;
+}
+
+std::vector<uint8_t> TcrConnection::readChunkBody(
+    std::chrono::microseconds timeout, int32_t chunkLength) {
+  std::vector<uint8_t> chunkBody(chunkLength);
+  auto error = receiveData(reinterpret_cast<char*>(chunkBody.data()),
+                           chunkLength, timeout, true, false);
+  if (error != CONN_NOERR) {
+    if (error & CONN_TIMEOUT) {
+      throwException(
+          TimeoutException("TcrConnection::readChunkBody: "
+                           "connection timed out while receiving chunk body"));
+    } else {
+      throwException(
+          GeodeIOException("TcrConnection::readChunkBody: "
+                           "connection failure while receiving chunk body"));
+    }
+  }
+
+  LOGDEBUG(
+      "TcrConnection::readChunkBody: received chunk body from endpoint "
+      "%s; bytes: %s",
+      m_endpoint,
+      Utils::convertBytesToString(chunkBody.data(), chunkLength).c_str());
+  return chunkBody;
+}
+
+bool TcrConnection::processChunk(TcrMessageReply& reply,
+                                 std::chrono::microseconds timeout,
+                                 int32_t chunkLength,
+                                 int8_t lastChunkAndSecurityFlags) {
+  // NOTE: this buffer is allocated by readChunkBody, and reply.processChunk
+  // takes ownership, so we don't delete it here on failure
+  std::vector<uint8_t> chunkBody = readChunkBody(timeout, chunkLength);
+
+  // Process the chunk; the actual processing is done by a separate thread
+  // ThinClientBaseDM::m_chunkProcessor.
+  reply.processChunk(chunkBody, chunkLength,
+                     m_endpointObj->getDistributedMemberID(),
+                     lastChunkAndSecurityFlags);
+  // Return boolean indicating whether or not there are more chunks, i.e.
+  // the *inverse* of the flag indicating this is the last chunk.  It's a
+  // little confusing here, but makes calling code clearer.
+  return (lastChunkAndSecurityFlags & LAST_CHUNK_MASK) ? false : true;
+}
+
 void TcrConnection::close() {
   TcrMessage* closeMsg = TcrMessage::getCloseConnMessage(
       m_poolDM->getConnectionManager().getCacheImpl());
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index b026062..4374a96 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -66,6 +66,18 @@
 namespace geode {
 namespace client {
 
+struct chunkHeader {
+  int32_t chunkLength;
+  int8_t flags;
+};
+
+struct chunkedResponseHeader {
+  int32_t messageType;
+  int32_t numberOfParts;
+  int32_t transactionId;
+  chunkHeader header;
+};
+
 enum ConnErrType {
   CONN_NOERR = 0x0,
   CONN_NODATA = 0x1,
@@ -247,13 +259,13 @@
    * connection and sets the reply message
    * parameter.
    * @param      reply response message
-   * @param      receiveTimeoutSec read timeout in sec
+   * @param      receiveTimeout read timeout
    * @param      doHeaderTimeoutRetries retry when header receive times out
    * @exception  GeodeIOException  if an I/O error occurs (socket failure).
    * @exception  TimeoutException  if timeout happens during read
    */
   void readMessageChunked(TcrMessageReply& reply,
-                          std::chrono::microseconds receiveTimeoutSec,
+                          std::chrono::microseconds receiveTimeout,
                           bool doHeaderTimeoutRetries);
 
   /**
@@ -318,6 +330,20 @@
   int64_t connectionId;
   const TcrConnectionManager* m_connectionManager;
   DiffieHellman* m_dh;
+
+  std::chrono::microseconds calculateHeaderTimeout(
+      std::chrono::microseconds receiveTimeout, bool retry);
+
+  chunkedResponseHeader readResponseHeader(std::chrono::microseconds timeout);
+
+  chunkHeader readChunkHeader(std::chrono::microseconds timeout);
+
+  std::vector<uint8_t> readChunkBody(std::chrono::microseconds timeout,
+                                     int32_t chunkLength);
+
+  bool processChunk(TcrMessageReply& reply, std::chrono::microseconds timeout,
+                    int32_t chunkLength, int8_t lastChunkAndSecurityFlags);
+
   /**
    * To read Intantiator message(which meant for java client), here we are
    * ignoring it
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index efd1f18..220f00c 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -713,7 +713,7 @@
   }
 }
 
-void TcrMessage::processChunk(const uint8_t* bytes, int32_t len,
+void TcrMessage::processChunk(const std::vector<uint8_t>& chunk, int32_t len,
                               uint16_t endpointmemId,
                               const uint8_t isLastChunkAndisSecurityHeader) {
   // TODO: see if security header is there
@@ -732,7 +732,7 @@
   switch (m_msgType) {
     case TcrMessage::REPLY: {
       LOGDEBUG("processChunk - got reply for request %d", m_msgTypeRequest);
-      chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
+      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
       break;
     }
     case TcrMessage::RESPONSE: {
@@ -748,12 +748,12 @@
         break;
       } else if (m_msgTypeRequest == TcrMessage::PUTALL ||
                  m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) {
-        TcrChunkedContext* chunk = new TcrChunkedContext(
-            bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
+        TcrChunkedContext* chunkedContext = new TcrChunkedContext(
+            chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
             m_tcdm->getConnectionManager().getCacheImpl());
         m_chunkedResult->setEndpointMemId(endpointmemId);
-        m_tcdm->queueChunk(chunk);
-        if (bytes == nullptr) {
+        m_tcdm->queueChunk(chunkedContext);
+        if (chunk.empty()) {
           // last chunk -- wait for processing of all the chunks to complete
           m_chunkedResult->waitFinalize();
           auto ex = m_chunkedResult->getException();
@@ -772,12 +772,12 @@
     case TcrMessage::RESPONSE_FROM_PRIMARY: {
       if (m_chunkedResult != nullptr) {
         LOGDEBUG("tcrmessage in case22 ");
-        TcrChunkedContext* chunk = new TcrChunkedContext(
-            bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
+        TcrChunkedContext* chunkedContext = new TcrChunkedContext(
+            chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
             m_tcdm->getConnectionManager().getCacheImpl());
         m_chunkedResult->setEndpointMemId(endpointmemId);
-        m_tcdm->queueChunk(chunk);
-        if (bytes == nullptr) {
+        m_tcdm->queueChunk(chunkedContext);
+        if (chunk.empty()) {
           // last chunk -- wait for processing of all the chunks to complete
           m_chunkedResult->waitFinalize();
           //  Throw any exception during processing here.
@@ -795,9 +795,8 @@
       } else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType ||
                  TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType ||
                  TcrMessage::GET_ALL_DATA_ERROR == m_msgType) {
-        if (bytes != nullptr) {
-          chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
-          _GEODE_SAFE_DELETE_ARRAY(bytes);
+        if (!chunk.empty()) {
+          chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
         }
       }
       break;
@@ -806,7 +805,7 @@
                                                     // error
     case EXECUTE_FUNCTION_ERROR:
     case EXECUTE_REGION_FUNCTION_ERROR: {
-      if (bytes != nullptr) {
+      if (!chunk.empty()) {
         // DeleteArray<const uint8_t> delChunk(bytes);
         //  DataInput input(bytes, len);
         // TODO: this not send two part...
@@ -814,17 +813,15 @@
         // readExceptionPart(input, false);
         // readSecureObjectPart(input, false, true,
         // isLastChunkAndisSecurityHeader );
-        chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
-        _GEODE_SAFE_DELETE_ARRAY(bytes);
+        chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
       }
       break;
     }
     case TcrMessage::EXCEPTION: {
-      if (bytes != nullptr) {
-        DeleteArray<const uint8_t> delChunk(bytes);
+      if (!chunk.empty()) {
         auto input =
             m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
-                bytes, len);
+                chunk.data(), len);
         readExceptionPart(input, isLastChunkAndisSecurityHeader);
         readSecureObjectPart(input, false, true,
                              isLastChunkAndisSecurityHeader);
@@ -833,42 +830,34 @@
     }
     case TcrMessage::RESPONSE_FROM_SECONDARY: {
       // TODO: how many parts
-      chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
-      if (bytes != nullptr) {
-        DeleteArray<const uint8_t> delChunk(bytes);
+      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
+      if (chunk.size()) {
         LOGFINEST("processChunk - got response from secondary, ignoring.");
       }
       break;
     }
     case TcrMessage::PUT_DATA_ERROR: {
-      chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
-      if (nullptr != bytes) {
+      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
+      if (!chunk.empty()) {
         auto input =
             m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
-                bytes, len);
+                chunk.data(), len);
         auto errorString = readStringPart(input);
 
         if (!errorString.empty()) {
           setThreadLocalExceptionMessage(errorString.c_str());
         }
-
-        _GEODE_SAFE_DELETE_ARRAY(bytes);
       }
       break;
     }
     case TcrMessage::GET_ALL_DATA_ERROR: {
-      chunkSecurityHeader(1, bytes, len, isLastChunkAndisSecurityHeader);
-      if (bytes != nullptr) {
-        _GEODE_SAFE_DELETE_ARRAY(bytes);
-      }
+      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
 
       break;
     }
     default: {
       // TODO: how many parts what should we do here
-      if (bytes != nullptr) {
-        _GEODE_SAFE_DELETE_ARRAY(bytes);
-      } else {
+      if (chunk.empty()) {
         LOGWARN(
             "Got unhandled message type %d while processing response, possible "
             "serialization mismatch",
@@ -889,13 +878,14 @@
   return nullptr;
 }
 
-void TcrMessage::chunkSecurityHeader(int skipPart, const uint8_t* bytes,
+void TcrMessage::chunkSecurityHeader(int skipPart,
+                                     const std::vector<uint8_t> bytes,
                                      int32_t len,
                                      uint8_t isLastChunkAndSecurityHeader) {
   LOGDEBUG("TcrMessage::chunkSecurityHeader:: skipParts = %d", skipPart);
   if ((isLastChunkAndSecurityHeader & 0x3) == 0x3) {
     auto di = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
-        bytes, len);
+        bytes.data(), len);
     skipParts(di, skipPart);
     readSecureObjectPart(di, false, true, isLastChunkAndSecurityHeader);
   }
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index 46665c0..7244d75 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -221,7 +221,7 @@
 
   void startProcessChunk(ACE_Semaphore& finalizeSema);
   // nullptr chunk means that this is the last chunk
-  void processChunk(const uint8_t* chunk, int32_t chunkLen,
+  void processChunk(const std::vector<uint8_t>& chunk, int32_t chunkLen,
                     uint16_t endpointmemId,
                     const uint8_t isLastChunkAndisSecurityHeader = 0x00);
   /* For creating a region on the java server */
@@ -569,8 +569,8 @@
   void writeMillisecondsPart(std::chrono::milliseconds millis);
   void writeByteAndTimeOutPart(uint8_t byteValue,
                                std::chrono::milliseconds timeout);
-  void chunkSecurityHeader(int skipParts, const uint8_t* bytes, int32_t len,
-                           uint8_t isLastChunkAndSecurityHeader);
+  void chunkSecurityHeader(int skipParts, const std::vector<uint8_t> bytes,
+                           int32_t len, uint8_t isLastChunkAndSecurityHeader);
 
   void readEventIdPart(DataInput& input, bool skip = false,
                        int32_t parts = 1);  // skip num parts then read eventid