Merge pull request #664 from arpadboda/MINIFICPP-1062
Signed-off-by: Daniel Bakai <bakaid@apache.org>
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index f4920bc..56df86c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -17,10 +17,12 @@
*/
#include "FlowFileRepository.h"
#include "rocksdb/write_batch.h"
+#include "rocksdb/slice.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include <list>
#include "FlowFileRecord.h"
namespace org {
@@ -32,25 +34,41 @@
void FlowFileRepository::flush() {
rocksdb::WriteBatch batch;
- std::string key;
- std::string value;
+ uint64_t decrement_total = 0;
rocksdb::ReadOptions options;
std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
- uint64_t decrement_total = 0;
+ std::vector<rocksdb::Slice> keys;
+ std::list<std::string> keystrings;
+ std::vector<std::string> values;
+
while (keys_to_delete.size_approx() > 0) {
+ std::string key;
if (keys_to_delete.try_dequeue(key)) {
- db_->Get(options, key, &value);
- decrement_total += value.size();
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
- if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) {
- purgeList.push_back(eventRead);
- }
- logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
- batch.Delete(key);
+ keystrings.push_back(std::move(key)); // rocksdb::Slice doesn't copy the string, only grabs ptrs. Hacky, but have to ensure the required lifetime of the strings.
+ keys.push_back(keystrings.back());
}
}
+
+ auto multistatus = db_->MultiGet(options, keys, &values);
+
+ for(size_t i=0; i<keys.size() && i<values.size() && i<multistatus.size(); ++i) {
+ if(!multistatus[i].ok()) {
+ logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data());
+ continue;
+ }
+
+ decrement_total += values[i].size();
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+ if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(values[i].data()), values[i].size())) {
+ purgeList.push_back(eventRead);
+ }
+ logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
+ batch.Delete(keys[i]);
+ }
+
+
if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
logger_->log_trace("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
if (decrement_total > repo_size_.load()) {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 6d0972a..3943c85 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -75,6 +75,10 @@
delete db_;
}
+ virtual bool isNoop() {
+ return false;
+ }
+
virtual void flush();
// initialize
@@ -104,9 +108,8 @@
logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
} else {
logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
- return false;
}
- return true;
+ return status.ok();
}
virtual void run();
@@ -114,14 +117,22 @@
virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
// persistent to the DB
rocksdb::Slice value((const char *) buf, bufLen);
- rocksdb::Status status;
repo_size_ += bufLen;
- status = db_->Put(rocksdb::WriteOptions(), key, value);
- if (status.ok())
- return true;
- else
- return false;
+ return db_->Put(rocksdb::WriteOptions(), key, value).ok();
}
+
+ virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+ rocksdb::WriteBatch batch;
+ for (const auto &item: data) {
+ rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
+ if (!batch.Put(item.first, value).ok()) {
+ return false;
+ }
+ }
+ return db_->Write(rocksdb::WriteOptions(), &batch).ok();
+ }
+
+
/**
*
* Deletes the key
@@ -138,12 +149,7 @@
virtual bool Get(const std::string &key, std::string &value) {
if (db_ == nullptr)
return false;
- rocksdb::Status status;
- status = db_->Get(rocksdb::ReadOptions(), key, &value);
- if (status.ok())
- return true;
- else
- return false;
+ return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
}
virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index 62438d7..ef446be 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -64,6 +64,10 @@
virtual void flush();
+ virtual bool isNoop() {
+ return false;
+ }
+
void start() {
if (this->purge_period_ <= 0)
return;
@@ -80,26 +84,26 @@
if (config->get(Configure::nifi_provenance_repository_directory_default, value)) {
directory_ = value;
}
- logger_->log_debug("NiFi Provenance Repository Directory %s", directory_);
+ logger_->log_debug("MiNiFi Provenance Repository Directory %s", directory_);
if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
core::Property::StringToInt(value, max_partition_bytes_);
}
- logger_->log_debug("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
+ logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
}
}
- logger_->log_debug("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
+ logger_->log_debug("MiNiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
if (status.ok()) {
- logger_->log_debug("NiFi Provenance Repository database open %s success", directory_);
+ logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_);
} else {
- logger_->log_error("NiFi Provenance Repository database open %s fail", directory_);
+ logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString());
return false;
}
@@ -113,13 +117,24 @@
// persist to the DB
rocksdb::Slice value((const char *) buf, bufLen);
- rocksdb::Status status;
- status = db_->Put(rocksdb::WriteOptions(), key, value);
- if (status.ok())
- return true;
- else
- return false;
+ return db_->Put(rocksdb::WriteOptions(), key, value).ok();
}
+
+ virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+ if (repo_full_) {
+ return false;
+ }
+
+ rocksdb::WriteBatch batch;
+ for (const auto &item: data) {
+ rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
+ if (!batch.Put(item.first, value).ok()) {
+ return false;
+ }
+ }
+ return db_->Write(rocksdb::WriteOptions(), &batch).ok();
+ }
+
// Delete
virtual bool Delete(std::string key) {
keys_to_delete.enqueue(key);
@@ -127,12 +142,7 @@
}
// Get
virtual bool Get(const std::string &key, std::string &value) {
- rocksdb::Status status;
- status = db_->Get(rocksdb::ReadOptions(), key, &value);
- if (status.ok())
- return true;
- else
- return false;
+ return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
}
// Remove event
@@ -177,12 +187,9 @@
}
delete it;
- if (max_size > 0) {
- return true;
- } else {
- return false;
- }
+ return max_size > 0;
}
+
//! get record
void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
@@ -212,12 +219,9 @@
break;
}
delete it;
- if (max_size > 0) {
- return true;
- } else {
- return false;
- }
+ return max_size > 0;
}
+
//! purge record
void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
for (auto record : records) {
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index c2d30df..ed452e4 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -158,8 +158,12 @@
put(ff);
}
}
+
// Put the flow file into queue
void put(std::shared_ptr<core::FlowFile> flow);
+
+ // Put multiple flowfiles into the queue
+ void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows);
// Poll the flow file from queue, the expired flow file record also being returned
std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
// Drain the flow records
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index f233627..578f556 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -131,6 +131,8 @@
// getAttribute key is enum
bool getKeyedAttribute(FlowAttribute key, std::string &value);
+ bool Serialize(io::DataStream &outStream);
+
//! Serialize and Persistent to the repository
bool Serialize();
//! DeSerialize
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index ca40ef5..f71bf1a 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -79,6 +79,10 @@
stop();
}
+ virtual bool isNoop() {
+ return true;
+ }
+
virtual void flush();
// initialize
@@ -89,6 +93,11 @@
virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
return true;
}
+
+ virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>>& data) {
+ return true;
+ }
+
// Delete
virtual bool Delete(std::string key) {
return true;
diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h
index aa987b5..c569c75 100644
--- a/libminifi/include/io/Serializable.h
+++ b/libminifi/include/io/Serializable.h
@@ -22,11 +22,41 @@
#include <string>
#include "EndianCheck.h"
#include "DataStream.h"
+#ifdef WIN32
+#include "Winsock2.h"
+#else
+#include <arpa/inet.h>
+#endif
+
+namespace {
+ template<typename Integral, typename std::enable_if<
+ std::is_integral<Integral>::value && (sizeof(Integral) == 2),Integral>::type* = nullptr>
+ Integral byteSwap(Integral i) {
+ return htons(i);
+ }
+ template<typename Integral, typename std::enable_if<
+ std::is_integral<Integral>::value &&(sizeof(Integral) == 4),Integral>::type* = nullptr>
+ Integral byteSwap(Integral i) {
+ return htonl(i);
+ }
+ template<typename Integral, typename std::enable_if<
+ std::is_integral<Integral>::value && (sizeof(Integral) == 8),Integral>::type* = nullptr>
+ Integral byteSwap(Integral i) {
+#ifdef htonll
+ return htonll(i);
+#else
+ #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
+ return htonll_r(i);
+#endif
+ }
+}
+
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace io {
+
/**
* Serializable instances provide base functionality to
* write certain objects/primitives to a data stream.
@@ -37,24 +67,6 @@
public:
/**
- * Inline function to write T to stream
- **/
- template<typename T>
- inline int writeData(const T &t, DataStream *stream);
-
- /**
- * Inline function to write T to to_vec
- **/
- template<typename T>
- inline int writeData(const T &t, uint8_t *to_vec);
-
- /**
- * Inline function to write T to to_vec
- **/
- template<typename T>
- inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
-
- /**
* write byte to stream
* @return resulting write size
**/
@@ -67,40 +79,13 @@
int write(char value, DataStream *stream);
/**
- * write 4 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint32_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
- * write 2 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint16_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
* write valueto stream
* @param value non encoded value
* @param len length of value
* @param strema output stream
* @return resulting write size
**/
- int write(uint8_t *value, int len, DataStream *stream);
-
- /**
- * write 8 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint64_t base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+ int write(const uint8_t * const value, int len, DataStream *stream);
/**
* write bool to stream
@@ -117,6 +102,24 @@
int writeUTF(std::string str, DataStream *stream, bool widen = false);
/**
+ * writes 2-8 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+ template<typename Integral, typename std::enable_if<
+ (sizeof(Integral) > 1) &&
+ std::is_integral<Integral>::value &&
+ !std::is_signed<Integral>::value
+ ,Integral>::type* = nullptr>
+ int write(Integral const & base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+ const Integral value = is_little_endian ? byteSwap(base_value) : base_value;
+
+ return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<Integral*>(&value)), sizeof(Integral));
+ }
+
+ /**
* reads a byte from the stream
* @param value reference in which will set the result
* @param stream stream from which we will read
@@ -125,14 +128,6 @@
int read(uint8_t &value, DataStream *stream);
/**
- * reads two bytes from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint16_t &base_value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
* reads a byte from the stream
* @param value reference in which will set the result
* @param stream stream from which we will read
@@ -150,22 +145,6 @@
int read(uint8_t *value, int len, DataStream *stream);
/**
- * reads four bytes from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint32_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
- * reads eight byte from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint64_t &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
* read UTF from stream
* @param str reference string
* @param stream stream from which we will read
@@ -173,8 +152,22 @@
**/
int readUTF(std::string &str, DataStream *stream, bool widen = false);
- protected:
+ /**
+ * reads 2-8 bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ template<typename Integral, typename std::enable_if<
+ (sizeof(Integral) > 1) &&
+ std::is_integral<Integral>::value &&
+ !std::is_signed<Integral>::value
+ ,Integral>::type* = nullptr>
+ int read(Integral &value, DataStream *stream, bool is_little_endian = EndiannessCheck::IS_LITTLE) {
+ return stream->read(value, is_little_endian);
+ }
+ protected:
};
} /* namespace io */
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 2804009..ea7bebd 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -347,6 +347,11 @@
_contentFullPath = flow->getResourceClaim()->getContentFullPath();
}
}
+ using SerializableComponent::Serialize;
+
+ // Serialize the event to a stream
+ bool Serialize(org::apache::nifi::minifi::io::DataStream& outStream);
+
// Serialize and Persistent to the repository
bool Serialize(const std::shared_ptr<core::SerializableComponent> &repo);
// DeSerialize
@@ -508,6 +513,10 @@
// allocate
std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) {
+ if(repo_->isNoop()) {
+ return nullptr;
+ }
+
auto event = std::make_shared<ProvenanceEventRecord>(eventType, _componentId, _componentType);
if (event)
event->fromFlowFile(flow);
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index be03180..ef609de 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -28,6 +28,7 @@
#include <chrono>
#include <thread>
#include <iostream>
+#include <list>
#include "core/FlowFile.h"
#include "Connection.h"
#include "core/Processor.h"
@@ -136,7 +137,7 @@
void Connection::put(std::shared_ptr<core::FlowFile> flow) {
if (drop_empty_ && flow->getSize() == 0) {
logger_->log_info("Dropping empty flow file: %s", flow->getUUIDStr());
- return;;
+ return;
}
{
std::lock_guard<std::mutex> lock(mutex_);
@@ -163,6 +164,53 @@
}
}
+void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
+ std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
+
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ for (auto &ff : flows) {
+ if (drop_empty_ && ff->getSize() == 0) {
+ logger_->log_info("Dropping empty flow file: %s", ff->getUUIDStr());
+ continue;
+ }
+
+ queue_.push(ff);
+ queued_data_size_ += ff->getSize();
+
+ logger_->log_debug("Enqueue flow file UUID %s to connection %s", ff->getUUIDStr(), name_);
+
+ if (!ff->isStored()) {
+ // Save to the flowfile repo
+ FlowFileRecord event(flow_repository_, content_repo_, ff, this->uuidStr_);
+
+ std::unique_ptr<io::DataStream> stramptr(new io::DataStream());
+ event.Serialize(*stramptr.get());
+
+ flowData.emplace_back(event.getUUIDStr(), std::move(stramptr));
+ }
+ }
+ }
+
+ if (!flow_repository_->MultiPut(flowData)) {
+ return;
+ }
+
+ for (auto& ff : flows) {
+ if (drop_empty_ && ff->getSize() == 0) {
+ continue;
+ }
+
+ ff->setStoredToRepository(true);
+
+ if (dest_connectable_) {
+ logger_->log_debug("Notifying %s that flowfiles were inserted", dest_connectable_->getName());
+ dest_connectable_->notifyWork();
+ }
+ }
+}
+
std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
std::lock_guard<std::mutex> lock(mutex_);
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 8868c9a..6dbbd75 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -215,9 +215,7 @@
return ret;
}
-bool FlowFileRecord::Serialize() {
- io::DataStream outStream;
-
+bool FlowFileRecord::Serialize(io::DataStream &outStream) {
int ret;
ret = write(this->event_time_, &outStream);
@@ -251,7 +249,7 @@
return false;
}
- for (auto itAttribute : attributes_) {
+ for (auto& itAttribute : attributes_) {
ret = writeUTF(itAttribute.first, &outStream, true);
if (ret <= 0) {
return false;
@@ -277,6 +275,20 @@
return false;
}
+ return true;
+}
+
+bool FlowFileRecord::Serialize() {
+ if (flow_repository_->isNoop()) {
+ return true;
+ }
+
+ io::DataStream outStream;
+
+ if (!Serialize(outStream)) {
+ return false;
+ }
+
if (flow_repository_->Put(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
logger_->log_debug("NiFi FlowFile Store event %s size %llu success", uuidStr_, outStream.getSize());
return true;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 5ecfcd0..6cf388c 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -818,6 +818,8 @@
}
}
+ std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+
std::shared_ptr<Connection> connection = nullptr;
// Complete process the added and update flow files for the session, send the flow file to its queue
for (const auto &it : _updatedFlowFiles) {
@@ -828,8 +830,9 @@
}
connection = std::static_pointer_cast<Connection>(record->getConnection());
- if ((connection) != nullptr)
- connection->put(record);
+ if ((connection) != nullptr) {
+ connectionQueues[connection].push_back(record);
+ }
}
for (const auto &it : _addedFlowFiles) {
std::shared_ptr<core::FlowFile> record = it.second;
@@ -838,8 +841,9 @@
continue;
}
connection = std::static_pointer_cast<Connection>(record->getConnection());
- if ((connection) != nullptr)
- connection->put(record);
+ if ((connection) != nullptr) {
+ connectionQueues[connection].push_back(record);
+ }
}
// Process the clone flow files
for (const auto &it : _clonedFlowFiles) {
@@ -849,8 +853,13 @@
continue;
}
connection = std::static_pointer_cast<Connection>(record->getConnection());
- if ((connection) != nullptr)
- connection->put(record);
+ if ((connection) != nullptr) {
+ connectionQueues[connection].push_back(record);
+ }
+ }
+
+ for (auto& cq : connectionQueues) {
+ cq.first->multiPut(cq.second);
}
// All done
@@ -872,6 +881,8 @@
}
void ProcessSession::rollback() {
+ std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+
try {
std::shared_ptr<Connection> connection = nullptr;
// Requeue the snapshot of the flowfile back
@@ -882,9 +893,14 @@
std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<FlowFileRecord>(record);
flowf->setSnapShot(false);
logger_->log_debug("ProcessSession rollback for %s, record %s, to connection %s", process_context_->getProcessorNode()->getName(), record->getUUIDStr(), connection->getName());
- connection->put(record);
+ connectionQueues[connection].push_back(record);
}
}
+
+ for (auto& cq : connectionQueues) {
+ cq.first->multiPut(cq.second);
+ }
+
_originalFlowFiles.clear();
_clonedFlowFiles.clear();
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 081f3db..a12b410 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -22,41 +22,15 @@
#include <string>
#include <algorithm>
#include "io/DataStream.h"
-#ifdef WIN32
-#include "Winsock2.h"
-#else
-#include <arpa/inet.h>
-#endif
+
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace io {
-#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)), 1)
-template<typename T>
-int Serializable::writeData(const T &t, DataStream *stream) {
- uint8_t bytes[sizeof t];
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
- return stream->writeData(bytes, sizeof t);
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, uint8_t *to_vec) {
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, to_vec);
- return sizeof t;
-}
-
-template<typename T>
-int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
- uint8_t bytes[sizeof t];
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
- to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
- return sizeof t;
-}
-
int Serializable::write(uint8_t value, DataStream *stream) {
return stream->writeData(&value, 1);
}
@@ -64,8 +38,8 @@
return stream->writeData(reinterpret_cast<uint8_t *>(&value), 1);
}
-int Serializable::write(uint8_t *value, int len, DataStream *stream) {
- return stream->writeData(value, len);
+int Serializable::write(const uint8_t * const value, int len, DataStream *stream) {
+ return stream->writeData(const_cast<uint8_t *>(value), len);
}
int Serializable::write(bool value, DataStream *stream) {
@@ -95,34 +69,6 @@
return stream->readData(value, len);
}
-int Serializable::read(uint16_t &value, DataStream *stream, bool is_little_endian) {
- return stream->read(value, is_little_endian);
-}
-
-int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) {
- return stream->read(value, is_little_endian);
-}
-int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) {
- return stream->read(value, is_little_endian);
-}
-
-int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) {
- const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
-
- return writeData(value, stream);
-}
-
-int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) {
- const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value;
- return writeData(value, stream);
-}
-
-int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) {
- const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
-
- return writeData(value, stream);
-}
-
int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
uint32_t utflen = 0;
int ret = 1;
@@ -130,16 +76,13 @@
uint16_t shortLength = 0;
ret = read(shortLength, stream);
utflen = shortLength;
- if (ret <= 0)
- return ret;
} else {
- uint32_t len;
- ret = read(len, stream);
- if (ret <= 0)
- return ret;
- utflen = len;
+ ret = read(utflen, stream);
}
+ if (ret <= 0)
+ return ret;
+
if (utflen == 0) {
str = "";
return 1;
@@ -161,38 +104,18 @@
if (utflen > 65535)
return -1;
+ if (!widen) {
+ uint16_t shortLen = utflen;
+ write(shortLen, stream);
+ } else {
+ write(utflen, stream);
+ }
+
if (utflen == 0) {
- if (!widen) {
- uint16_t shortLen = utflen;
- write(shortLen, stream);
- } else {
- write(utflen, stream);
- }
return 1;
}
- std::vector<uint8_t> utf_to_write;
- if (!widen) {
- utf_to_write.resize(utflen);
- } else {
- utf_to_write.resize(utflen);
- }
-
- uint8_t *underlyingPtr = &utf_to_write[0];
- for (auto c : str) {
- writeData(c, underlyingPtr++);
- }
- int ret;
-
- if (!widen) {
- uint16_t short_length = utflen;
- write(short_length, stream);
- ret = stream->writeData(utf_to_write.data(), utflen);
- } else {
- write(utflen, stream);
- ret = stream->writeData(utf_to_write.data(), utflen);
- }
- return ret;
+ return stream->writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(str.c_str())), utflen);
}
} /* namespace io */
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index a3e16d5..16126d6 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -21,6 +21,7 @@
#include <memory>
#include <string>
#include <vector>
+#include <list>
#include "core/Repository.h"
#include "io/DataStream.h"
#include "io/Serializable.h"
@@ -84,9 +85,7 @@
return ret;
}
-bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) {
- org::apache::nifi::minifi::io::DataStream outStream;
-
+bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::DataStream& outStream) {
int ret;
ret = writeUTF(this->uuidStr_, &outStream);
@@ -147,7 +146,7 @@
return false;
}
- for (auto itAttribute : _attributes) {
+ for (const auto& itAttribute : _attributes) {
ret = writeUTF(itAttribute.first, &outStream, true);
if (ret <= 0) {
return false;
@@ -185,7 +184,7 @@
if (ret != 4) {
return false;
}
- for (auto parentUUID : _parentUuids) {
+ for (const auto& parentUUID : _parentUuids) {
ret = writeUTF(parentUUID, &outStream);
if (ret <= 0) {
return false;
@@ -196,7 +195,7 @@
if (ret != 4) {
return false;
}
- for (auto childUUID : _childrenUuids) {
+ for (const auto& childUUID : _childrenUuids) {
ret = writeUTF(childUUID, &outStream);
if (ret <= 0) {
return false;
@@ -217,6 +216,15 @@
return false;
}
}
+
+ return true;
+}
+
+bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) {
+ org::apache::nifi::minifi::io::DataStream outStream;
+
+ Serialize(outStream);
+
// Persist to the DB
if (!repo->Serialize(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
logger_->log_error("NiFi Provenance Store event %s size %llu fail", uuidStr_, outStream.getSize());
@@ -373,13 +381,24 @@
}
void ProvenanceReporter::commit() {
- for (auto event : _events) {
- if (!repo_->isFull()) {
- event->Serialize(repo_);
- } else {
- logger_->log_debug("Provenance Repository is full");
- }
+ if (repo_->isNoop()) {
+ return;
}
+
+ if (repo_->isFull()) {
+ logger_->log_debug("Provenance Repository is full");
+ return;
+ }
+
+ std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
+
+ for (auto& event : _events) {
+ std::unique_ptr<io::DataStream> stramptr(new io::DataStream());
+ event->Serialize(*stramptr.get());
+
+ flowData.emplace_back(event->getUUIDStr(), std::move(stramptr));
+ }
+ repo_->MultiPut(flowData);
}
void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::string detail) {
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 00547af..2852697 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -71,11 +71,24 @@
}
+ virtual bool isNoop() {
+ return false;
+ }
+
bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
repositoryResults.insert(std::pair<std::string, std::string>(key, std::string((const char*) buf, bufLen)));
return true;
}
+ bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+ for (const auto& item: data) {
+ if (!Put(item.first, item.second->getBuffer(), item.second->getSize())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
return Put(key, buffer, bufferSize);
}