MINIFICPP-1127 - Provenance repo performance should be improved
MINIFICPP-1127 - Added tests for Provenance DB max size and TTL
Signed-off-by: Arpad Boda <aboda@apache.org>
Approved by szaszm and bakaid on GH
This closes #716
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index bbd58fd..2b8de20 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -17,72 +17,37 @@
*/
#include "ProvenanceRepository.h"
-#include "rocksdb/write_batch.h"
#include <string>
-#include <vector>
-#include "rocksdb/options.h"
-#include "provenance/Provenance.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace provenance {
-void ProvenanceRepository::flush() {
- rocksdb::WriteBatch batch;
- std::string key;
- std::string value;
- rocksdb::ReadOptions options;
- uint64_t decrement_total = 0;
- while (keys_to_delete.size_approx() > 0) {
- if (keys_to_delete.try_dequeue(key)) {
- db_->Get(options, key, &value);
- decrement_total += value.size();
- batch.Delete(key);
- logger_->log_debug("Removing %s", key);
- }
- }
- if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
- logger_->log_debug("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
- if (decrement_total > repo_size_.load()) {
- repo_size_ = 0;
- } else {
- repo_size_ -= decrement_total;
- }
- }
+void ProvenanceRepository::printStats() {
+ std::string key_count;
+ db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
+
+ std::string table_readers;
+ db_->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
+
+ std::string all_memtables;
+ db_->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
+
+ logger_->log_info("Repository stats: key count: %zu, table readers size: %zu, all memory tables size: %zu",
+ key_count, table_readers, all_memtables);
}
void ProvenanceRepository::run() {
+ size_t count = 0;
while (running_) {
- std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
- uint64_t curTime = getTimeMillis();
- // threshold for purge
- uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
-
- uint64_t size = getRepoSize();
-
- if (size >= purgeThreshold) {
- rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- ProvenanceEventRecord eventRead;
- std::string key = it->key().ToString();
- uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size());
- if (eventTime > 0) {
- if ((curTime - eventTime) > (uint64_t)max_partition_millis_)
- Delete(key);
- } else {
- logger_->log_debug("NiFi Provenance retrieve event %s fail", key);
- Delete(key);
- }
- }
- delete it;
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ count++;
+ // Hack, to be removed in scope of https://issues.apache.org/jira/browse/MINIFICPP-1145
+ count = count % 30;
+ if(count == 0) {
+ printStats();
}
- flush();
- size = getRepoSize();
- if (size > (uint64_t)max_partition_bytes_)
- repo_full_ = true;
- else
- repo_full_ = false;
}
}
} /* namespace provenance */
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index ef446be..39a9e85 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -24,7 +24,6 @@
#include "core/Core.h"
#include "provenance/Provenance.h"
#include "core/logging/LoggerConfiguration.h"
-#include "concurrentqueue.h"
namespace org {
namespace apache {
namespace nifi {
@@ -38,7 +37,6 @@
class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
public:
-
ProvenanceRepository(std::string name, utils::Identifier uuid)
: ProvenanceRepository(name){
@@ -56,21 +54,13 @@
db_ = NULL;
}
- // Destructor
- virtual ~ProvenanceRepository() {
- if (db_)
- delete db_;
- }
-
- virtual void flush();
+ void printStats();
virtual bool isNoop() {
return false;
}
void start() {
- if (this->purge_period_ <= 0)
- return;
if (running_)
return;
running_ = true;
@@ -99,9 +89,28 @@
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
- rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
+ // Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after
+ // This shouldn't go above 16MB and the configured total size of the db should cap it as well
+ int64_t max_buffer_size = 16 << 20;
+ options.write_buffer_size = std::min(max_buffer_size, max_partition_bytes_);
+ options.max_write_buffer_number = 4;
+ options.min_write_buffer_number_to_merge = 1;
+
+ options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
+ options.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
+ if(max_partition_millis_ > 0) {
+ options.compaction_options_fifo.ttl = max_partition_millis_ / 1000;
+ }
+
+ logger_->log_info("Write buffer: %llu", options.write_buffer_size);
+ logger_->log_info("Max partition bytes: %llu", max_partition_bytes_);
+ logger_->log_info("Ttl: %llu", options.compaction_options_fifo.ttl);
+
+ rocksdb::DB* db;
+ rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
if (status.ok()) {
logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_);
+ db_.reset(db);
} else {
logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString());
return false;
@@ -111,20 +120,12 @@
}
// Put
virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
- if (repo_full_) {
- return false;
- }
-
// persist to the DB
rocksdb::Slice value((const char *) buf, bufLen);
return db_->Put(rocksdb::WriteOptions(), key, value).ok();
}
virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
- if (repo_full_) {
- return false;
- }
-
rocksdb::WriteBatch batch;
for (const auto &item: data) {
rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
@@ -137,7 +138,7 @@
// Delete
virtual bool Delete(std::string key) {
- keys_to_delete.enqueue(key);
+ // The repo is cleaned up by itself, there is no need to delete items.
return true;
}
// Get
@@ -145,17 +146,12 @@
return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
}
- // Remove event
- void removeEvent(ProvenanceEventRecord *event) {
- Delete(event->getEventId());
- }
-
virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
return Put(key, buffer, bufferSize);
}
virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
- rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+ std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
std::string key = it->key().ToString();
@@ -165,12 +161,11 @@
store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
}
}
- delete it;
return true;
}
virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
- rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+ std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
size_t requested_batch = max_size;
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -183,16 +178,13 @@
max_size++;
records.push_back(eventRead);
}
-
}
- delete it;
-
return max_size > 0;
}
//! get record
void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
- rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+ std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
std::string key = it->key().ToString();
@@ -202,11 +194,10 @@
records.push_back(eventRead);
}
}
- delete it;
}
virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
- rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+ std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
@@ -218,35 +209,30 @@
if (store.size() >= max_size)
break;
}
- delete it;
return max_size > 0;
}
- //! purge record
- void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
- for (auto record : records) {
- Delete(record->getEventId());
- }
- flush();
- }
// destroy
void destroy() {
- if (db_) {
- delete db_;
- db_ = NULL;
- }
+ db_.reset();
}
// Run function for the thread
void run();
+ uint64_t getKeyCount() const {
+ std::string key_count;
+ db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
+
+ return std::stoull(key_count);
+ }
+
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceRepository(const ProvenanceRepository &parent) = delete;
ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
private:
- moodycamel::ConcurrentQueue<std::string> keys_to_delete;
- rocksdb::DB* db_;
+ std::unique_ptr<rocksdb::DB> db_;
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
new file mode 100644
index 0000000..962eb06
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
@@ -0,0 +1,122 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ProvenanceRepository.h"
+#include "../TestBase.h"
+#include <array>
+#include <chrono>
+#include <vector>
+#include <random>
+
+#define TEST_PROVENANCE_STORAGE_SIZE (1024*100) // 100 KB
+#define TEST_MAX_PROVENANCE_STORAGE_SIZE (100*1024*1024) // 100 MB
+
+#define TEST_PROVENANCE_ENTRY_LIFE_TIME (1000) // 1 sec
+
+void generateData(std::vector<char>& data) {
+ std::random_device rd;
+ std::mt19937 eng(rd());
+
+ std::uniform_int_distribution<> distr(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
+ auto rand = std::bind(distr, eng);
+ std::generate_n(data.begin(), data.size(), rand);
+}
+
+void provisionRepo(minifi::provenance::ProvenanceRepository& repo, size_t number_of_records, size_t record_size) {
+ for (int i = 0; i < number_of_records; ++i) {
+ std::vector<char> v(record_size);
+ generateData(v);
+ REQUIRE(repo.Put(std::to_string(i), reinterpret_cast<const uint8_t*>(v.data()), v.size()));
+ }
+}
+
+void verifyMaxKeyCount(const minifi::provenance::ProvenanceRepository& repo, uint64_t keyCount) {
+ uint64_t k = keyCount;
+
+ for (int i = 0; i < 50; ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ k = std::min(k, repo.getKeyCount());
+ if (k < keyCount) {
+ break;
+ }
+ }
+
+ REQUIRE(k < keyCount);
+}
+
+TEST_CASE("Test size limit", "[sizeLimitTest]") {
+ TestController testController;
+
+ char dirtemplate[] = "/tmp/db.XXXXXX";
+ auto temp_dir = testController.createTempDirectory(dirtemplate);
+ REQUIRE(!temp_dir.empty());
+
+ // 60 sec, 100 KB - going to exceed the size limit
+ minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir,
+ MAX_PROVENANCE_ENTRY_LIFE_TIME, TEST_PROVENANCE_STORAGE_SIZE, 1000);
+
+ auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, temp_dir);
+
+ REQUIRE(provdb.initialize(configuration));
+
+ uint64_t keyCount = 500;
+
+ provisionRepo(provdb, keyCount, 10240);
+
+ verifyMaxKeyCount(provdb, 200);
+}
+
+TEST_CASE("Test time limit", "[timeLimitTest]") {
+ TestController testController;
+
+ char dirtemplate[] = "/tmp/db.XXXXXX";
+ auto temp_dir = testController.createTempDirectory(dirtemplate);
+ REQUIRE(!temp_dir.empty());
+
+ // 1 sec, 100 MB - going to exceed TTL
+ minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir,
+ TEST_PROVENANCE_ENTRY_LIFE_TIME, TEST_MAX_PROVENANCE_STORAGE_SIZE, 1000);
+
+ auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, temp_dir);
+
+ REQUIRE(provdb.initialize(configuration));
+
+ uint64_t keyCount = 500;
+
+ provisionRepo(provdb, keyCount / 2, 102400);
+
+ REQUIRE(provdb.getKeyCount() == 250);
+
+ /**
+ * Magic: TTL-based DB cleanup only triggers when writeBuffers are serialized to storage
+ * To achieve this 250 entries are put to DB with a total size that ensures at least one buffer is serialized
+ * Wait more than a sec to make sure the serialized records expire
+ * Put another set of entries to trigger cleanup logic to drop the already serialized records
+ * This tests relies on the default settings of Provenance repo: a size of a writeBuffer is 16 MB
+ * One provisioning call here writes 25 MB to make sure serialization is triggered
+ * When the 2nd 50 MB is written the records of the 1st serialization are dropped -> around 160 of them
+ * That's why the final check verifies keyCount to be below 400
+ */
+ std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+ provisionRepo(provdb, keyCount /2, 102400);
+
+ verifyMaxKeyCount(provdb, 400);
+}