MINIFICPP-1062 - Decrease memory and CPU usage of serialize operations
diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h
index aa987b5..c569c75 100644
--- a/libminifi/include/io/Serializable.h
+++ b/libminifi/include/io/Serializable.h
@@ -22,11 +22,41 @@
 #include <string>
 #include "EndianCheck.h"
 #include "DataStream.h"
+#ifdef WIN32
+#include "Winsock2.h"
+#else
+#include <arpa/inet.h>
+#endif
+
+namespace {
+  template<typename Integral, typename std::enable_if<
+      std::is_integral<Integral>::value && (sizeof(Integral) == 2),Integral>::type* = nullptr>
+  Integral byteSwap(Integral i) {
+    return htons(i);
+  }
+  template<typename Integral, typename std::enable_if<
+      std::is_integral<Integral>::value &&(sizeof(Integral) == 4),Integral>::type* = nullptr>
+  Integral byteSwap(Integral i) {
+    return htonl(i);
+  }
+  template<typename Integral, typename std::enable_if<
+      std::is_integral<Integral>::value && (sizeof(Integral) == 8),Integral>::type* = nullptr>
+  Integral byteSwap(Integral i) {
+#ifdef htonll
+    return htonll(i);
+#else
+    #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
+    return htonll_r(i);
+#endif
+  }
+}
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace io {
+
 /**
  * Serializable instances provide base functionality to
  * write certain objects/primitives to a data stream.
@@ -37,24 +67,6 @@
  public:
 
   /**
-   * Inline function to write T to stream
-   **/
-  template<typename T>
-  inline int writeData(const T &t, DataStream *stream);
-
-  /**
-   * Inline function to write T to to_vec
-   **/
-  template<typename T>
-  inline int writeData(const T &t, uint8_t *to_vec);
-
-  /**
-   * Inline function to write T to to_vec
-   **/
-  template<typename T>
-  inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
-
-  /**
    * write byte to stream
    * @return resulting write size
    **/
@@ -67,40 +79,13 @@
   int write(char value, DataStream *stream);
 
   /**
-   * write 4 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint32_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
-   * write 2 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint16_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
    * write valueto stream
    * @param value non encoded value
    * @param len length of value
    * @param strema output stream
    * @return resulting write size
    **/
-  int write(uint8_t *value, int len, DataStream *stream);
-
-  /**
-   * write 8 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint64_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+  int write(const uint8_t * const value, int len, DataStream *stream);
 
   /**
    * write bool to stream
@@ -117,6 +102,24 @@
   int writeUTF(std::string str, DataStream *stream, bool widen = false);
 
   /**
+  * writes 2-8 bytes to stream
+  * @param base_value non encoded value
+  * @param stream output stream
+  * @param is_little_endian endianness determination
+  * @return resulting write size
+  **/
+  template<typename Integral, typename std::enable_if<
+      (sizeof(Integral) > 1) &&
+      std::is_integral<Integral>::value &&
+      !std::is_signed<Integral>::value
+      ,Integral>::type* = nullptr>
+  int write(Integral const & base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+    const Integral value = is_little_endian ? byteSwap(base_value) : base_value;
+
+    return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<Integral*>(&value)), sizeof(Integral));
+  }
+
+  /**
    * reads a byte from the stream
    * @param value reference in which will set the result
    * @param stream stream from which we will read
@@ -125,14 +128,6 @@
   int read(uint8_t &value, DataStream *stream);
 
   /**
-   * reads two bytes from the stream
-   * @param value reference in which will set the result
-   * @param stream stream from which we will read
-   * @return resulting read size
-   **/
-  int read(uint16_t &base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
    * reads a byte from the stream
    * @param value reference in which will set the result
    * @param stream stream from which we will read
@@ -150,22 +145,6 @@
   int read(uint8_t *value, int len, DataStream *stream);
 
   /**
-   * reads four bytes from the stream
-   * @param value reference in which will set the result
-   * @param stream stream from which we will read
-   * @return resulting read size
-   **/
-  int read(uint32_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
-   * reads eight byte from the stream
-   * @param value reference in which will set the result
-   * @param stream stream from which we will read
-   * @return resulting read size
-   **/
-  int read(uint64_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
    * read UTF from stream
    * @param str reference string
    * @param stream stream from which we will read
@@ -173,8 +152,22 @@
    **/
   int readUTF(std::string &str, DataStream *stream, bool widen = false);
 
- protected:
+  /**
+  * reads 2-8 bytes from the stream
+  * @param value reference in which will set the result
+  * @param stream stream from which we will read
+  * @return resulting read size
+  **/
+  template<typename Integral, typename std::enable_if<
+      (sizeof(Integral) > 1) &&
+      std::is_integral<Integral>::value &&
+      !std::is_signed<Integral>::value
+      ,Integral>::type* = nullptr>
+  int read(Integral &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+    return stream->read(value, is_little_endian);
+  }
 
+ protected:
 };
 
 } /* namespace io */
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 081f3db..a12b410 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -22,41 +22,15 @@
 #include <string>
 #include <algorithm>
 #include "io/DataStream.h"
-#ifdef WIN32
-#include "Winsock2.h"
-#else
-#include <arpa/inet.h>
-#endif
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace io {
 
-#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
 #define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)), 1)
 
-template<typename T>
-int Serializable::writeData(const T &t, DataStream *stream) {
-  uint8_t bytes[sizeof t];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
-  return stream->writeData(bytes, sizeof t);
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, uint8_t *to_vec) {
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, to_vec);
-  return sizeof t;
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
-  uint8_t bytes[sizeof t];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
-  to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
-  return sizeof t;
-}
-
 int Serializable::write(uint8_t value, DataStream *stream) {
   return stream->writeData(&value, 1);
 }
@@ -64,8 +38,8 @@
   return stream->writeData(reinterpret_cast<uint8_t *>(&value), 1);
 }
 
-int Serializable::write(uint8_t *value, int len, DataStream *stream) {
-  return stream->writeData(value, len);
+int Serializable::write(const uint8_t * const value, int len, DataStream *stream) {
+  return stream->writeData(const_cast<uint8_t *>(value), len);
 }
 
 int Serializable::write(bool value, DataStream *stream) {
@@ -95,34 +69,6 @@
   return stream->readData(value, len);
 }
 
-int Serializable::read(uint16_t &value, DataStream *stream, bool is_little_endian) {
-  return stream->read(value, is_little_endian);
-}
-
-int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) {
-  return stream->read(value, is_little_endian);
-}
-int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) {
-  return stream->read(value, is_little_endian);
-}
-
-int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) {
-  const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
-
-  return writeData(value, stream);
-}
-
-int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) {
-  const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value;
-  return writeData(value, stream);
-}
-
-int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) {
-  const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
-
-  return writeData(value, stream);
-}
-
 int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
   uint32_t utflen = 0;
   int ret = 1;
@@ -130,16 +76,13 @@
     uint16_t shortLength = 0;
     ret = read(shortLength, stream);
     utflen = shortLength;
-    if (ret <= 0)
-      return ret;
   } else {
-    uint32_t len;
-    ret = read(len, stream);
-    if (ret <= 0)
-      return ret;
-    utflen = len;
+    ret = read(utflen, stream);
   }
 
+  if (ret <= 0)
+    return ret;
+
   if (utflen == 0) {
     str = "";
     return 1;
@@ -161,38 +104,18 @@
   if (utflen > 65535)
     return -1;
 
+  if (!widen) {
+    uint16_t shortLen = utflen;
+    write(shortLen, stream);
+  } else {
+    write(utflen, stream);
+  }
+
   if (utflen == 0) {
-    if (!widen) {
-      uint16_t shortLen = utflen;
-      write(shortLen, stream);
-    } else {
-      write(utflen, stream);
-    }
     return 1;
   }
 
-  std::vector<uint8_t> utf_to_write;
-  if (!widen) {
-    utf_to_write.resize(utflen);
-  } else {
-    utf_to_write.resize(utflen);
-  }
-
-  uint8_t *underlyingPtr = &utf_to_write[0];
-  for (auto c : str) {
-    writeData(c, underlyingPtr++);
-  }
-  int ret;
-
-  if (!widen) {
-    uint16_t short_length = utflen;
-    write(short_length, stream);
-    ret = stream->writeData(utf_to_write.data(), utflen);
-  } else {
-    write(utflen, stream);
-    ret = stream->writeData(utf_to_write.data(), utflen);
-  }
-  return ret;
+  return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(str.c_str())), utflen);
 }
 
 } /* namespace io */