Merge pull request #664 from arpadboda/MINIFICPP-1062

Signed-off-by: Daniel Bakai <bakaid@apache.org>
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index f4920bc..56df86c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -17,10 +17,12 @@
  */
 #include "FlowFileRepository.h"
 #include "rocksdb/write_batch.h"
+#include "rocksdb/slice.h"
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
+#include <list>
 #include "FlowFileRecord.h"
 
 namespace org {
@@ -32,25 +34,41 @@
 
 void FlowFileRepository::flush() {
   rocksdb::WriteBatch batch;
-  std::string key;
-  std::string value;
+  uint64_t decrement_total = 0;
   rocksdb::ReadOptions options;
 
   std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
 
-  uint64_t decrement_total = 0;
+  std::vector<rocksdb::Slice> keys;
+  std::list<std::string> keystrings;
+  std::vector<std::string> values;
+
   while (keys_to_delete.size_approx() > 0) {
+    std::string key;
     if (keys_to_delete.try_dequeue(key)) {
-      db_->Get(options, key, &value);
-      decrement_total += value.size();
-      std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
-      if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) {
-        purgeList.push_back(eventRead);
-      }
-      logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
-      batch.Delete(key);
+      keystrings.push_back(std::move(key));  // rocksdb::Slice doesn't copy the string, only grabs ptrs. Hacky, but have to ensure the required lifetime of the strings.
+      keys.push_back(keystrings.back());
     }
   }
+
+  auto multistatus = db_->MultiGet(options, keys, &values);
+
+  for(size_t i=0; i<keys.size() && i<values.size() && i<multistatus.size(); ++i) {
+    if(!multistatus[i].ok()) {
+      logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data());
+      continue;
+    }
+
+    decrement_total += values[i].size();
+    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+    if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(values[i].data()), values[i].size())) {
+      purgeList.push_back(eventRead);
+    }
+    logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
+    batch.Delete(keys[i]);
+  }
+
+
   if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
     logger_->log_trace("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
     if (decrement_total > repo_size_.load()) {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 6d0972a..3943c85 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -75,6 +75,10 @@
       delete db_;
   }
 
+  virtual bool isNoop() {
+    return false;
+  }
+
   virtual void flush();
 
   // initialize
@@ -104,9 +108,8 @@
       logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
     } else {
       logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
-      return false;
     }
-    return true;
+    return status.ok();
   }
 
   virtual void run();
@@ -114,14 +117,22 @@
   virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
     // persistent to the DB
     rocksdb::Slice value((const char *) buf, bufLen);
-    rocksdb::Status status;
     repo_size_ += bufLen;
-    status = db_->Put(rocksdb::WriteOptions(), key, value);
-    if (status.ok())
-      return true;
-    else
-      return false;
+    return db_->Put(rocksdb::WriteOptions(), key, value).ok();
   }
+
+  virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+    rocksdb::WriteBatch batch;
+    for (const auto &item: data) {
+      rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
+      if (!batch.Put(item.first, value).ok()) {
+        return false;
+      }
+    }
+    return db_->Write(rocksdb::WriteOptions(), &batch).ok();
+  }
+
+
   /**
    * 
    * Deletes the key
@@ -138,12 +149,7 @@
   virtual bool Get(const std::string &key, std::string &value) {
     if (db_ == nullptr)
       return false;
-    rocksdb::Status status;
-    status = db_->Get(rocksdb::ReadOptions(), key, &value);
-    if (status.ok())
-      return true;
-    else
-      return false;
+    return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
   }
 
   virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index 62438d7..ef446be 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -64,6 +64,10 @@
 
   virtual void flush();
 
+  virtual bool isNoop() {
+    return false;
+  }
+
   void start() {
     if (this->purge_period_ <= 0)
       return;
@@ -80,26 +84,26 @@
     if (config->get(Configure::nifi_provenance_repository_directory_default, value)) {
       directory_ = value;
     }
-    logger_->log_debug("NiFi Provenance Repository Directory %s", directory_);
+    logger_->log_debug("MiNiFi Provenance Repository Directory %s", directory_);
     if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
       core::Property::StringToInt(value, max_partition_bytes_);
     }
-    logger_->log_debug("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
+    logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
     if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
       core::TimeUnit unit;
       if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
       }
     }
-    logger_->log_debug("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
+    logger_->log_debug("MiNiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
     rocksdb::Options options;
     options.create_if_missing = true;
     options.use_direct_io_for_flush_and_compaction = true;
     options.use_direct_reads = true;
     rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
     if (status.ok()) {
-      logger_->log_debug("NiFi Provenance Repository database open %s success", directory_);
+      logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_);
     } else {
-      logger_->log_error("NiFi Provenance Repository database open %s fail", directory_);
+      logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString());
       return false;
     }
 
@@ -113,13 +117,24 @@
 
     // persist to the DB
     rocksdb::Slice value((const char *) buf, bufLen);
-    rocksdb::Status status;
-    status = db_->Put(rocksdb::WriteOptions(), key, value);
-    if (status.ok())
-      return true;
-    else
-      return false;
+    return db_->Put(rocksdb::WriteOptions(), key, value).ok();
   }
+
+  virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+    if (repo_full_) {
+      return false;
+    }
+
+    rocksdb::WriteBatch batch;
+    for (const auto &item: data) {
+      rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
+      if (!batch.Put(item.first, value).ok()) {
+        return false;
+      }
+    }
+    return db_->Write(rocksdb::WriteOptions(), &batch).ok();
+  }
+
   // Delete
   virtual bool Delete(std::string key) {
     keys_to_delete.enqueue(key);
@@ -127,12 +142,7 @@
   }
   // Get
   virtual bool Get(const std::string &key, std::string &value) {
-    rocksdb::Status status;
-    status = db_->Get(rocksdb::ReadOptions(), key, &value);
-    if (status.ok())
-      return true;
-    else
-      return false;
+    return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
   }
 
   // Remove event
@@ -177,12 +187,9 @@
     }
     delete it;
 
-    if (max_size > 0) {
-      return true;
-    } else {
-      return false;
-    }
+    return max_size > 0;
   }
+
   //! get record
   void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
     rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
@@ -212,12 +219,9 @@
         break;
     }
     delete it;
-    if (max_size > 0) {
-      return true;
-    } else {
-      return false;
-    }
+    return max_size > 0;
   }
+
   //! purge record
   void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
     for (auto record : records) {
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index c2d30df..ed452e4 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -158,8 +158,12 @@
       put(ff);
     }
   }
+
   // Put the flow file into queue
   void put(std::shared_ptr<core::FlowFile> flow);
+
+  // Put multiple flowfiles into the queue
+  void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows);
   // Poll the flow file from queue, the expired flow file record also being returned
   std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
   // Drain the flow records
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index f233627..578f556 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -131,6 +131,8 @@
   // getAttribute key is enum
   bool getKeyedAttribute(FlowAttribute key, std::string &value);
 
+  bool Serialize(io::DataStream &outStream);
+
   //! Serialize and Persistent to the repository
   bool Serialize();
   //! DeSerialize
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index ca40ef5..f71bf1a 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -79,6 +79,10 @@
     stop();
   }
 
+  virtual bool isNoop() {
+    return true;
+  }
+
   virtual void flush();
 
   // initialize
@@ -89,6 +93,11 @@
   virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
     return true;
   }
+
+  virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>>& data) {
+    return true;
+  }
+
   // Delete
   virtual bool Delete(std::string key) {
     return true;
diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h
index aa987b5..c569c75 100644
--- a/libminifi/include/io/Serializable.h
+++ b/libminifi/include/io/Serializable.h
@@ -22,11 +22,41 @@
 #include <string>
 #include "EndianCheck.h"
 #include "DataStream.h"
+#ifdef WIN32
+#include "Winsock2.h"
+#else
+#include <arpa/inet.h>
+#endif
+
+namespace {
+  template<typename Integral, typename std::enable_if<
+      std::is_integral<Integral>::value && (sizeof(Integral) == 2),Integral>::type* = nullptr>
+  Integral byteSwap(Integral i) {
+    return htons(i);
+  }
+  template<typename Integral, typename std::enable_if<
+      std::is_integral<Integral>::value &&(sizeof(Integral) == 4),Integral>::type* = nullptr>
+  Integral byteSwap(Integral i) {
+    return htonl(i);
+  }
+  template<typename Integral, typename std::enable_if<
+      std::is_integral<Integral>::value && (sizeof(Integral) == 8),Integral>::type* = nullptr>
+  Integral byteSwap(Integral i) {
+#ifdef htonll
+    return htonll(i);
+#else
+    #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
+    return htonll_r(i);
+#endif
+  }
+}
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace io {
+
 /**
  * Serializable instances provide base functionality to
  * write certain objects/primitives to a data stream.
@@ -37,24 +67,6 @@
  public:
 
   /**
-   * Inline function to write T to stream
-   **/
-  template<typename T>
-  inline int writeData(const T &t, DataStream *stream);
-
-  /**
-   * Inline function to write T to to_vec
-   **/
-  template<typename T>
-  inline int writeData(const T &t, uint8_t *to_vec);
-
-  /**
-   * Inline function to write T to to_vec
-   **/
-  template<typename T>
-  inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
-
-  /**
    * write byte to stream
    * @return resulting write size
    **/
@@ -67,40 +79,13 @@
   int write(char value, DataStream *stream);
 
   /**
-   * write 4 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint32_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
-   * write 2 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint16_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
    * write valueto stream
    * @param value non encoded value
    * @param len length of value
    * @param strema output stream
    * @return resulting write size
    **/
-  int write(uint8_t *value, int len, DataStream *stream);
-
-  /**
-   * write 8 bytes to stream
-   * @param base_value non encoded value
-   * @param stream output stream
-   * @param is_little_endian endianness determination
-   * @return resulting write size
-   **/
-  int write(uint64_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+  int write(const uint8_t * const value, int len, DataStream *stream);
 
   /**
    * write bool to stream
@@ -117,6 +102,24 @@
   int writeUTF(std::string str, DataStream *stream, bool widen = false);
 
   /**
+  * writes 2-8 bytes to stream
+  * @param base_value non encoded value
+  * @param stream output stream
+  * @param is_little_endian endianness determination
+  * @return resulting write size
+  **/
+  template<typename Integral, typename std::enable_if<
+      (sizeof(Integral) > 1) &&
+      std::is_integral<Integral>::value &&
+      !std::is_signed<Integral>::value
+      ,Integral>::type* = nullptr>
+  int write(Integral const & base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+    const Integral value = is_little_endian ? byteSwap(base_value) : base_value;
+
+    return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<Integral*>(&value)), sizeof(Integral));
+  }
+
+  /**
    * reads a byte from the stream
    * @param value reference in which will set the result
    * @param stream stream from which we will read
@@ -125,14 +128,6 @@
   int read(uint8_t &value, DataStream *stream);
 
   /**
-   * reads two bytes from the stream
-   * @param value reference in which will set the result
-   * @param stream stream from which we will read
-   * @return resulting read size
-   **/
-  int read(uint16_t &base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
    * reads a byte from the stream
    * @param value reference in which will set the result
    * @param stream stream from which we will read
@@ -150,22 +145,6 @@
   int read(uint8_t *value, int len, DataStream *stream);
 
   /**
-   * reads four bytes from the stream
-   * @param value reference in which will set the result
-   * @param stream stream from which we will read
-   * @return resulting read size
-   **/
-  int read(uint32_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
-   * reads eight byte from the stream
-   * @param value reference in which will set the result
-   * @param stream stream from which we will read
-   * @return resulting read size
-   **/
-  int read(uint64_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-  /**
    * read UTF from stream
    * @param str reference string
    * @param stream stream from which we will read
@@ -173,8 +152,22 @@
    **/
   int readUTF(std::string &str, DataStream *stream, bool widen = false);
 
- protected:
+  /**
+  * reads 2-8 bytes from the stream
+  * @param value reference in which will set the result
+  * @param stream stream from which we will read
+  * @return resulting read size
+  **/
+  template<typename Integral, typename std::enable_if<
+      (sizeof(Integral) > 1) &&
+      std::is_integral<Integral>::value &&
+      !std::is_signed<Integral>::value
+      ,Integral>::type* = nullptr>
+  int read(Integral &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+    return stream->read(value, is_little_endian);
+  }
 
+ protected:
 };
 
 } /* namespace io */
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 2804009..ea7bebd 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -347,6 +347,11 @@
       _contentFullPath = flow->getResourceClaim()->getContentFullPath();
     }
   }
+  using SerializableComponent::Serialize;
+
+  // Serialize the event to a stream
+  bool Serialize(org::apache::nifi::minifi::io::DataStream& outStream);
+
   // Serialize and Persistent to the repository
   bool Serialize(const std::shared_ptr<core::SerializableComponent> &repo);
   // DeSerialize
@@ -508,6 +513,10 @@
 
   // allocate
   std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) {
+    if(repo_->isNoop()) {
+      return nullptr;
+    }
+
     auto event = std::make_shared<ProvenanceEventRecord>(eventType, _componentId, _componentType);
     if (event)
       event->fromFlowFile(flow);
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index be03180..ef609de 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -28,6 +28,7 @@
 #include <chrono>
 #include <thread>
 #include <iostream>
+#include <list>
 #include "core/FlowFile.h"
 #include "Connection.h"
 #include "core/Processor.h"
@@ -136,7 +137,7 @@
 void Connection::put(std::shared_ptr<core::FlowFile> flow) {
   if (drop_empty_ && flow->getSize() == 0) {
     logger_->log_info("Dropping empty flow file: %s", flow->getUUIDStr());
-    return;;
+    return;
   }
   {
     std::lock_guard<std::mutex> lock(mutex_);
@@ -163,6 +164,53 @@
   }
 }
 
+void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
+  std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
+
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+
+    for (auto &ff : flows) {
+      if (drop_empty_ && ff->getSize() == 0) {
+        logger_->log_info("Dropping empty flow file: %s", ff->getUUIDStr());
+        continue;
+      }
+
+      queue_.push(ff);
+      queued_data_size_ += ff->getSize();
+
+      logger_->log_debug("Enqueue flow file UUID %s to connection %s", ff->getUUIDStr(), name_);
+
+      if (!ff->isStored()) {
+        // Save to the flowfile repo
+        FlowFileRecord event(flow_repository_, content_repo_, ff, this->uuidStr_);
+
+        std::unique_ptr<io::DataStream> stramptr(new io::DataStream());
+        event.Serialize(*stramptr.get());
+
+        flowData.emplace_back(event.getUUIDStr(), std::move(stramptr));
+      }
+    }
+  }
+
+  if (!flow_repository_->MultiPut(flowData)) {
+    return;
+  }
+
+  for (auto& ff : flows) {
+    if (drop_empty_ && ff->getSize() == 0) {
+      continue;
+    }
+
+    ff->setStoredToRepository(true);
+
+    if (dest_connectable_) {
+      logger_->log_debug("Notifying %s that flowfiles were inserted", dest_connectable_->getName());
+      dest_connectable_->notifyWork();
+    }
+  }
+}
+
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
   std::lock_guard<std::mutex> lock(mutex_);
 
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 8868c9a..6dbbd75 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -215,9 +215,7 @@
   return ret;
 }
 
-bool FlowFileRecord::Serialize() {
-  io::DataStream outStream;
-
+bool FlowFileRecord::Serialize(io::DataStream &outStream) {
   int ret;
 
   ret = write(this->event_time_, &outStream);
@@ -251,7 +249,7 @@
     return false;
   }
 
-  for (auto itAttribute : attributes_) {
+  for (auto& itAttribute : attributes_) {
     ret = writeUTF(itAttribute.first, &outStream, true);
     if (ret <= 0) {
       return false;
@@ -277,6 +275,20 @@
     return false;
   }
 
+  return true;
+}
+
+bool FlowFileRecord::Serialize() {
+  if (flow_repository_->isNoop()) {
+    return true;
+  }
+
+  io::DataStream outStream;
+
+  if (!Serialize(outStream)) {
+    return false;
+  }
+
   if (flow_repository_->Put(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
     logger_->log_debug("NiFi FlowFile Store event %s size %llu success", uuidStr_, outStream.getSize());
     return true;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 5ecfcd0..6cf388c 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -818,6 +818,8 @@
       }
     }
 
+    std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+
     std::shared_ptr<Connection> connection = nullptr;
     // Complete process the added and update flow files for the session, send the flow file to its queue
     for (const auto &it : _updatedFlowFiles) {
@@ -828,8 +830,9 @@
       }
 
       connection = std::static_pointer_cast<Connection>(record->getConnection());
-      if ((connection) != nullptr)
-        connection->put(record);
+      if ((connection) != nullptr) {
+        connectionQueues[connection].push_back(record);
+      }
     }
     for (const auto &it : _addedFlowFiles) {
       std::shared_ptr<core::FlowFile> record = it.second;
@@ -838,8 +841,9 @@
         continue;
       }
       connection = std::static_pointer_cast<Connection>(record->getConnection());
-      if ((connection) != nullptr)
-        connection->put(record);
+      if ((connection) != nullptr) {
+        connectionQueues[connection].push_back(record);
+      }
     }
     // Process the clone flow files
     for (const auto &it : _clonedFlowFiles) {
@@ -849,8 +853,13 @@
         continue;
       }
       connection = std::static_pointer_cast<Connection>(record->getConnection());
-      if ((connection) != nullptr)
-        connection->put(record);
+      if ((connection) != nullptr) {
+        connectionQueues[connection].push_back(record);
+      }
+    }
+
+    for (auto& cq : connectionQueues) {
+      cq.first->multiPut(cq.second);
     }
 
     // All done
@@ -872,6 +881,8 @@
 }
 
 void ProcessSession::rollback() {
+  std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+
   try {
     std::shared_ptr<Connection> connection = nullptr;
     // Requeue the snapshot of the flowfile back
@@ -882,9 +893,14 @@
         std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<FlowFileRecord>(record);
         flowf->setSnapShot(false);
         logger_->log_debug("ProcessSession rollback for %s, record %s, to connection %s", process_context_->getProcessorNode()->getName(), record->getUUIDStr(), connection->getName());
-        connection->put(record);
+        connectionQueues[connection].push_back(record);
       }
     }
+
+    for (auto& cq : connectionQueues) {
+      cq.first->multiPut(cq.second);
+    }
+
     _originalFlowFiles.clear();
 
     _clonedFlowFiles.clear();
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 081f3db..a12b410 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -22,41 +22,15 @@
 #include <string>
 #include <algorithm>
 #include "io/DataStream.h"
-#ifdef WIN32
-#include "Winsock2.h"
-#else
-#include <arpa/inet.h>
-#endif
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace io {
 
-#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
 #define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)), 1)
 
-template<typename T>
-int Serializable::writeData(const T &t, DataStream *stream) {
-  uint8_t bytes[sizeof t];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
-  return stream->writeData(bytes, sizeof t);
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, uint8_t *to_vec) {
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, to_vec);
-  return sizeof t;
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
-  uint8_t bytes[sizeof t];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
-  to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
-  return sizeof t;
-}
-
 int Serializable::write(uint8_t value, DataStream *stream) {
   return stream->writeData(&value, 1);
 }
@@ -64,8 +38,8 @@
   return stream->writeData(reinterpret_cast<uint8_t *>(&value), 1);
 }
 
-int Serializable::write(uint8_t *value, int len, DataStream *stream) {
-  return stream->writeData(value, len);
+int Serializable::write(const uint8_t * const value, int len, DataStream *stream) {
+  return stream->writeData(const_cast<uint8_t *>(value), len);
 }
 
 int Serializable::write(bool value, DataStream *stream) {
@@ -95,34 +69,6 @@
   return stream->readData(value, len);
 }
 
-int Serializable::read(uint16_t &value, DataStream *stream, bool is_little_endian) {
-  return stream->read(value, is_little_endian);
-}
-
-int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) {
-  return stream->read(value, is_little_endian);
-}
-int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) {
-  return stream->read(value, is_little_endian);
-}
-
-int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) {
-  const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
-
-  return writeData(value, stream);
-}
-
-int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) {
-  const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value;
-  return writeData(value, stream);
-}
-
-int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) {
-  const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
-
-  return writeData(value, stream);
-}
-
 int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
   uint32_t utflen = 0;
   int ret = 1;
@@ -130,16 +76,13 @@
     uint16_t shortLength = 0;
     ret = read(shortLength, stream);
     utflen = shortLength;
-    if (ret <= 0)
-      return ret;
   } else {
-    uint32_t len;
-    ret = read(len, stream);
-    if (ret <= 0)
-      return ret;
-    utflen = len;
+    ret = read(utflen, stream);
   }
 
+  if (ret <= 0)
+    return ret;
+
   if (utflen == 0) {
     str = "";
     return 1;
@@ -161,38 +104,18 @@
   if (utflen > 65535)
     return -1;
 
+  if (!widen) {
+    uint16_t shortLen = utflen;
+    write(shortLen, stream);
+  } else {
+    write(utflen, stream);
+  }
+
   if (utflen == 0) {
-    if (!widen) {
-      uint16_t shortLen = utflen;
-      write(shortLen, stream);
-    } else {
-      write(utflen, stream);
-    }
     return 1;
   }
 
-  std::vector<uint8_t> utf_to_write;
-  if (!widen) {
-    utf_to_write.resize(utflen);
-  } else {
-    utf_to_write.resize(utflen);
-  }
-
-  uint8_t *underlyingPtr = &utf_to_write[0];
-  for (auto c : str) {
-    writeData(c, underlyingPtr++);
-  }
-  int ret;
-
-  if (!widen) {
-    uint16_t short_length = utflen;
-    write(short_length, stream);
-    ret = stream->writeData(utf_to_write.data(), utflen);
-  } else {
-    write(utflen, stream);
-    ret = stream->writeData(utf_to_write.data(), utflen);
-  }
-  return ret;
+  return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(str.c_str())), utflen);
 }
 
 } /* namespace io */
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index a3e16d5..16126d6 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -21,6 +21,7 @@
 #include <memory>
 #include <string>
 #include <vector>
+#include <list>
 #include "core/Repository.h"
 #include "io/DataStream.h"
 #include "io/Serializable.h"
@@ -84,9 +85,7 @@
   return ret;
 }
 
-bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) {
-  org::apache::nifi::minifi::io::DataStream outStream;
-
+bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::DataStream& outStream) {
   int ret;
 
   ret = writeUTF(this->uuidStr_, &outStream);
@@ -147,7 +146,7 @@
     return false;
   }
 
-  for (auto itAttribute : _attributes) {
+  for (const auto& itAttribute : _attributes) {
     ret = writeUTF(itAttribute.first, &outStream, true);
     if (ret <= 0) {
       return false;
@@ -185,7 +184,7 @@
     if (ret != 4) {
       return false;
     }
-    for (auto parentUUID : _parentUuids) {
+    for (const auto& parentUUID : _parentUuids) {
       ret = writeUTF(parentUUID, &outStream);
       if (ret <= 0) {
         return false;
@@ -196,7 +195,7 @@
     if (ret != 4) {
       return false;
     }
-    for (auto childUUID : _childrenUuids) {
+    for (const auto& childUUID : _childrenUuids) {
       ret = writeUTF(childUUID, &outStream);
       if (ret <= 0) {
         return false;
@@ -217,6 +216,15 @@
       return false;
     }
   }
+
+  return true;
+}
+
+bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) {
+  org::apache::nifi::minifi::io::DataStream outStream;
+
+  Serialize(outStream);
+
   // Persist to the DB
   if (!repo->Serialize(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
     logger_->log_error("NiFi Provenance Store event %s size %llu fail", uuidStr_, outStream.getSize());
@@ -373,13 +381,24 @@
 }
 
 void ProvenanceReporter::commit() {
-  for (auto event : _events) {
-    if (!repo_->isFull()) {
-      event->Serialize(repo_);
-    } else {
-      logger_->log_debug("Provenance Repository is full");
-    }
+  if (repo_->isNoop()) {
+    return;
   }
+
+  if (repo_->isFull()) {
+    logger_->log_debug("Provenance Repository is full");
+    return;
+  }
+
+  std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
+
+  for (auto& event : _events) {
+    std::unique_ptr<io::DataStream> stramptr(new io::DataStream());
+    event->Serialize(*stramptr.get());
+
+    flowData.emplace_back(event->getUUIDStr(), std::move(stramptr));
+  }
+  repo_->MultiPut(flowData);
 }
 
 void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::string detail) {
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 00547af..2852697 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -71,11 +71,24 @@
 
   }
 
+  virtual bool isNoop() {
+    return false;
+  }
+
   bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
     repositoryResults.insert(std::pair<std::string, std::string>(key, std::string((const char*) buf, bufLen)));
     return true;
   }
 
+  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+    for (const auto& item: data) {
+      if (!Put(item.first, item.second->getBuffer(), item.second->getSize())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
     return Put(key, buffer, bufferSize);
   }