MINIFICPP-1328 Improve performance of FileStream
Signed-off-by: Marton Szasz <szaszm01@gmail.com>
This closes #869
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 49f7a53..ad7c7aa 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -124,7 +124,7 @@
*/
template<typename T>
int readBuffer(std::vector<uint8_t>& buf, const T& t);
- std::recursive_mutex file_lock_;
+ std::mutex file_lock_;
std::unique_ptr<std::fstream> file_stream_;
size_t offset_;
std::string path_;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 5925865..4266d24 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -261,10 +261,9 @@
flow->setResourceClaim(claim);
stream->closeStream();
- std::stringstream details;
- details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
+ provenance_report_->modifyContent(flow, details, endTime - startTime);
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 1d79469..870e926 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -34,19 +34,17 @@
path_(path),
offset_(0) {
file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
- if (append)
+ if (append) {
file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out | std::fstream::app | std::fstream::binary);
- else
- file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
- file_stream_->seekg(0, file_stream_->end);
- file_stream_->seekp(0, file_stream_->end);
- std::streamoff len = file_stream_->tellg();
- if (len > 0) {
- length_ = gsl::narrow<size_t>(len);
+ file_stream_->seekg(0, file_stream_->end);
+ file_stream_->seekp(0, file_stream_->end);
+ std::streamoff len = file_stream_->tellg();
+ length_ = len > 0 ? gsl::narrow<size_t>(len) : 0;
+ seek(offset_);
} else {
+ file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
length_ = 0;
}
- seek(offset_);
}
FileStream::FileStream(const std::string &path, uint32_t offset, bool write_enable)
@@ -71,15 +69,12 @@
}
void FileStream::closeStream() {
- std::lock_guard<std::recursive_mutex> lock(file_lock_);
- if (file_stream_ != nullptr) {
- file_stream_->close();
- file_stream_ = nullptr;
- }
+ std::lock_guard<std::mutex> lock(file_lock_);
+ file_stream_.reset();
}
void FileStream::seek(uint64_t offset) {
- std::lock_guard<std::recursive_mutex> lock(file_lock_);
+ std::lock_guard<std::mutex> lock(file_lock_);
offset_ = gsl::narrow<size_t>(offset);
file_stream_->clear();
file_stream_->seekg(offset_);
@@ -101,13 +96,12 @@
int FileStream::writeData(uint8_t *value, int size) {
if (!IsNullOrEmpty(value)) {
- std::lock_guard<std::recursive_mutex> lock(file_lock_);
+ std::lock_guard<std::mutex> lock(file_lock_);
if (file_stream_->write(reinterpret_cast<const char*>(value), size)) {
offset_ += size;
if (offset_ > length_) {
length_ = offset_;
}
- file_stream_->seekg(offset_);
file_stream_->flush();
return size;
} else {
@@ -149,7 +143,7 @@
int FileStream::readData(uint8_t *buf, int buflen) {
if (!IsNullOrEmpty(buf)) {
- std::lock_guard<std::recursive_mutex> lock(file_lock_);
+ std::lock_guard<std::mutex> lock(file_lock_);
if (!file_stream_) {
return -1;
}