MINIFICPP-1062 - Decrease memory and CPU usage of serialize operations
diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h
index aa987b5..c569c75 100644
--- a/libminifi/include/io/Serializable.h
+++ b/libminifi/include/io/Serializable.h
@@ -22,11 +22,41 @@
#include <string>
#include "EndianCheck.h"
#include "DataStream.h"
+#ifdef WIN32
+#include "Winsock2.h"
+#else
+#include <arpa/inet.h>
+#endif
+
+namespace {
+ template<typename Integral, typename std::enable_if<
+ std::is_integral<Integral>::value && (sizeof(Integral) == 2),Integral>::type* = nullptr>
+ Integral byteSwap(Integral i) {
+ return htons(i);
+ }
+ template<typename Integral, typename std::enable_if<
+ std::is_integral<Integral>::value &&(sizeof(Integral) == 4),Integral>::type* = nullptr>
+ Integral byteSwap(Integral i) {
+ return htonl(i);
+ }
+ template<typename Integral, typename std::enable_if<
+ std::is_integral<Integral>::value && (sizeof(Integral) == 8),Integral>::type* = nullptr>
+ Integral byteSwap(Integral i) {
+#ifdef htonll
+ return htonll(i);
+#else
+ #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
+ return htonll_r(i);
+#endif
+ }
+}
+
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace io {
+
/**
* Serializable instances provide base functionality to
* write certain objects/primitives to a data stream.
@@ -37,24 +67,6 @@
public:
/**
- * Inline function to write T to stream
- **/
- template<typename T>
- inline int writeData(const T &t, DataStream *stream);
-
- /**
- * Inline function to write T to to_vec
- **/
- template<typename T>
- inline int writeData(const T &t, uint8_t *to_vec);
-
- /**
- * Inline function to write T to to_vec
- **/
- template<typename T>
- inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
-
- /**
* write byte to stream
* @return resulting write size
**/
@@ -67,40 +79,13 @@
int write(char value, DataStream *stream);
/**
- * write 4 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint32_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
- * write 2 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint16_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
* write valueto stream
* @param value non encoded value
* @param len length of value
* @param strema output stream
* @return resulting write size
**/
- int write(uint8_t *value, int len, DataStream *stream);
-
- /**
- * write 8 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint64_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+ int write(const uint8_t * const value, int len, DataStream *stream);
/**
* write bool to stream
@@ -117,6 +102,24 @@
int writeUTF(std::string str, DataStream *stream, bool widen = false);
/**
+ * writes 2-8 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+ template<typename Integral, typename std::enable_if<
+ (sizeof(Integral) > 1) &&
+ std::is_integral<Integral>::value &&
+ !std::is_signed<Integral>::value
+ ,Integral>::type* = nullptr>
+ int write(Integral const & base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+ const Integral value = is_little_endian ? byteSwap(base_value) : base_value;
+
+ return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<Integral*>(&value)), sizeof(Integral));
+ }
+
+ /**
* reads a byte from the stream
* @param value reference in which will set the result
* @param stream stream from which we will read
@@ -125,14 +128,6 @@
int read(uint8_t &value, DataStream *stream);
/**
- * reads two bytes from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint16_t &base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
* reads a byte from the stream
* @param value reference in which will set the result
* @param stream stream from which we will read
@@ -150,22 +145,6 @@
int read(uint8_t *value, int len, DataStream *stream);
/**
- * reads four bytes from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint32_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
- * reads eight byte from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint64_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
* read UTF from stream
* @param str reference string
* @param stream stream from which we will read
@@ -173,8 +152,22 @@
**/
int readUTF(std::string &str, DataStream *stream, bool widen = false);
- protected:
+ /**
+ * reads 2-8 bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ template<typename Integral, typename std::enable_if<
+ (sizeof(Integral) > 1) &&
+ std::is_integral<Integral>::value &&
+ !std::is_signed<Integral>::value
+ ,Integral>::type* = nullptr>
+ int read(Integral &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+ return stream->read(value, is_little_endian);
+ }
+ protected:
};
} /* namespace io */
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 081f3db..a12b410 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -22,41 +22,15 @@
#include <string>
#include <algorithm>
#include "io/DataStream.h"
-#ifdef WIN32
-#include "Winsock2.h"
-#else
-#include <arpa/inet.h>
-#endif
+
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace io {
-#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)), 1)
-template<typename T>
-int Serializable::writeData(const T &t, DataStream *stream) {
- uint8_t bytes[sizeof t];
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
- return stream->writeData(bytes, sizeof t);
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, uint8_t *to_vec) {
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, to_vec);
- return sizeof t;
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
- uint8_t bytes[sizeof t];
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
- to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
- return sizeof t;
-}
-
int Serializable::write(uint8_t value, DataStream *stream) {
return stream->writeData(&value, 1);
}
@@ -64,8 +38,8 @@
return stream->writeData(reinterpret_cast<uint8_t *>(&value), 1);
}
-int Serializable::write(uint8_t *value, int len, DataStream *stream) {
- return stream->writeData(value, len);
+int Serializable::write(const uint8_t * const value, int len, DataStream *stream) {
+ return stream->writeData(const_cast<uint8_t *>(value), len);
}
int Serializable::write(bool value, DataStream *stream) {
@@ -95,34 +69,6 @@
return stream->readData(value, len);
}
-int Serializable::read(uint16_t &value, DataStream *stream, bool is_little_endian) {
- return stream->read(value, is_little_endian);
-}
-
-int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) {
- return stream->read(value, is_little_endian);
-}
-int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) {
- return stream->read(value, is_little_endian);
-}
-
-int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) {
- const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
-
- return writeData(value, stream);
-}
-
-int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) {
- const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value;
- return writeData(value, stream);
-}
-
-int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) {
- const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
-
- return writeData(value, stream);
-}
-
int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
uint32_t utflen = 0;
int ret = 1;
@@ -130,16 +76,13 @@
uint16_t shortLength = 0;
ret = read(shortLength, stream);
utflen = shortLength;
- if (ret <= 0)
- return ret;
} else {
- uint32_t len;
- ret = read(len, stream);
- if (ret <= 0)
- return ret;
- utflen = len;
+ ret = read(utflen, stream);
}
+ if (ret <= 0)
+ return ret;
+
if (utflen == 0) {
str = "";
return 1;
@@ -161,38 +104,18 @@
if (utflen > 65535)
return -1;
+ if (!widen) {
+ uint16_t shortLen = utflen;
+ write(shortLen, stream);
+ } else {
+ write(utflen, stream);
+ }
+
if (utflen == 0) {
- if (!widen) {
- uint16_t shortLen = utflen;
- write(shortLen, stream);
- } else {
- write(utflen, stream);
- }
return 1;
}
- std::vector<uint8_t> utf_to_write;
- if (!widen) {
- utf_to_write.resize(utflen);
- } else {
- utf_to_write.resize(utflen);
- }
-
- uint8_t *underlyingPtr = &utf_to_write[0];
- for (auto c : str) {
- writeData(c, underlyingPtr++);
- }
- int ret;
-
- if (!widen) {
- uint16_t short_length = utflen;
- write(short_length, stream);
- ret = stream->writeData(utf_to_write.data(), utflen);
- } else {
- write(utflen, stream);
- ret = stream->writeData(utf_to_write.data(), utflen);
- }
- return ret;
+ return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(str.c_str())), utflen);
}
} /* namespace io */