MINIFICPP-1127 - Provenance repo performance should be improved

MINIFICPP-1127 - Added tests for Provenance DB max size and TTL

Signed-off-by: Arpad Boda <aboda@apache.org>

Approved by szaszm and bakaid on GH

This closes #716
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index bbd58fd..2b8de20 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -17,72 +17,37 @@
  */
 
 #include "ProvenanceRepository.h"
-#include "rocksdb/write_batch.h"
 #include <string>
-#include <vector>
-#include "rocksdb/options.h"
-#include "provenance/Provenance.h"
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace provenance {
 
-void ProvenanceRepository::flush() {
-  rocksdb::WriteBatch batch;
-  std::string key;
-  std::string value;
-  rocksdb::ReadOptions options;
-  uint64_t decrement_total = 0;
-  while (keys_to_delete.size_approx() > 0) {
-    if (keys_to_delete.try_dequeue(key)) {
-      db_->Get(options, key, &value);
-      decrement_total += value.size();
-      batch.Delete(key);
-      logger_->log_debug("Removing %s", key);
-    }
-  }
-  if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
-    logger_->log_debug("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
-    if (decrement_total > repo_size_.load()) {
-      repo_size_ = 0;
-    } else {
-      repo_size_ -= decrement_total;
-    }
-  }
+void ProvenanceRepository::printStats() {
+  std::string key_count;
+  db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
+
+  std::string table_readers;
+  db_->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
+
+  std::string all_memtables;
+  db_->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
+
+  logger_->log_info("Repository stats: key count: %zu, table readers size: %zu, all memory tables size: %zu",
+                    key_count, table_readers, all_memtables);
 }
 
 void ProvenanceRepository::run() {
+  size_t count = 0;
   while (running_) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
-    uint64_t curTime = getTimeMillis();
-    // threshold for purge
-    uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
-
-    uint64_t size = getRepoSize();
-
-    if (size >= purgeThreshold) {
-      rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
-      for (it->SeekToFirst(); it->Valid(); it->Next()) {
-        ProvenanceEventRecord eventRead;
-        std::string key = it->key().ToString();
-        uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size());
-        if (eventTime > 0) {
-          if ((curTime - eventTime) > (uint64_t)max_partition_millis_)
-            Delete(key);
-        } else {
-          logger_->log_debug("NiFi Provenance retrieve event %s fail", key);
-          Delete(key);
-        }
-      }
-      delete it;
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    count++;
+    // Hack, to be removed in scope of https://issues.apache.org/jira/browse/MINIFICPP-1145
+    count = count % 30;
+    if(count == 0) {
+      printStats();
     }
-    flush();
-    size = getRepoSize();
-    if (size > (uint64_t)max_partition_bytes_)
-      repo_full_ = true;
-    else
-      repo_full_ = false;
   }
 }
 } /* namespace provenance */
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index ef446be..39a9e85 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -24,7 +24,6 @@
 #include "core/Core.h"
 #include "provenance/Provenance.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "concurrentqueue.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -38,7 +37,6 @@
 
 class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
  public:
-
   ProvenanceRepository(std::string name, utils::Identifier uuid)
       : ProvenanceRepository(name){
 
@@ -56,21 +54,13 @@
     db_ = NULL;
   }
 
-  // Destructor
-  virtual ~ProvenanceRepository() {
-    if (db_)
-      delete db_;
-  }
-
-  virtual void flush();
+  void printStats();
 
   virtual bool isNoop() {
     return false;
   }
 
   void start() {
-    if (this->purge_period_ <= 0)
-      return;
     if (running_)
       return;
     running_ = true;
@@ -99,9 +89,28 @@
     options.create_if_missing = true;
     options.use_direct_io_for_flush_and_compaction = true;
     options.use_direct_reads = true;
-    rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
+    // Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after
+    // This shouldn't go above 16MB and the configured total size of the db should cap it as well
+    int64_t max_buffer_size = 16 << 20;
+    options.write_buffer_size = std::min(max_buffer_size, max_partition_bytes_);
+    options.max_write_buffer_number = 4;
+    options.min_write_buffer_number_to_merge = 1;
+
+    options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
+    options.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
+    if(max_partition_millis_ > 0) {
+      options.compaction_options_fifo.ttl = max_partition_millis_ / 1000;
+    }
+
+    logger_->log_info("Write buffer: %llu", options.write_buffer_size);
+    logger_->log_info("Max partition bytes: %llu", max_partition_bytes_);
+    logger_->log_info("Ttl: %llu", options.compaction_options_fifo.ttl);
+
+    rocksdb::DB* db;
+    rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
     if (status.ok()) {
       logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_);
+      db_.reset(db);
     } else {
       logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString());
       return false;
@@ -111,20 +120,12 @@
   }
   // Put
   virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
-    if (repo_full_) {
-      return false;
-    }
-
     // persist to the DB
     rocksdb::Slice value((const char *) buf, bufLen);
     return db_->Put(rocksdb::WriteOptions(), key, value).ok();
   }
 
   virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
-    if (repo_full_) {
-      return false;
-    }
-
     rocksdb::WriteBatch batch;
     for (const auto &item: data) {
       rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
@@ -137,7 +138,7 @@
 
   // Delete
   virtual bool Delete(std::string key) {
-    keys_to_delete.enqueue(key);
+    // The repo is cleaned up by itself, there is no need to delete items.
     return true;
   }
   // Get
@@ -145,17 +146,12 @@
     return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
   }
 
-  // Remove event
-  void removeEvent(ProvenanceEventRecord *event) {
-    Delete(event->getEventId());
-  }
-
   virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
     return Put(key, buffer, bufferSize);
   }
 
   virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
-    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
       std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
       std::string key = it->key().ToString();
@@ -165,12 +161,11 @@
         store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
       }
     }
-    delete it;
     return true;
   }
 
   virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
-    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
     size_t requested_batch = max_size;
     max_size = 0;
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -183,16 +178,13 @@
         max_size++;
         records.push_back(eventRead);
       }
-
     }
-    delete it;
-
     return max_size > 0;
   }
 
   //! get record
   void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
-    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
       std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
       std::string key = it->key().ToString();
@@ -202,11 +194,10 @@
         records.push_back(eventRead);
       }
     }
-    delete it;
   }
 
   virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
-    rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+    std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
     max_size = 0;
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
       std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
@@ -218,35 +209,30 @@
       if (store.size() >= max_size)
         break;
     }
-    delete it;
     return max_size > 0;
   }
 
-  //! purge record
-  void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
-    for (auto record : records) {
-      Delete(record->getEventId());
-    }
-    flush();
-  }
   // destroy
   void destroy() {
-    if (db_) {
-      delete db_;
-      db_ = NULL;
-    }
+    db_.reset();
   }
   // Run function for the thread
   void run();
 
+  uint64_t getKeyCount() const {
+    std::string key_count;
+    db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
+
+    return std::stoull(key_count);
+  }
+
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   ProvenanceRepository(const ProvenanceRepository &parent) = delete;
   ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
 
  private:
-  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
-  rocksdb::DB* db_;
+  std::unique_ptr<rocksdb::DB> db_;
   std::shared_ptr<logging::Logger> logger_;
 };
 
diff --git a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
new file mode 100644
index 0000000..962eb06
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
@@ -0,0 +1,122 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ProvenanceRepository.h"
+#include "../TestBase.h"
+#include <array>
+#include <chrono>
+#include <vector>
+#include <random>
+
+#define TEST_PROVENANCE_STORAGE_SIZE (1024*100)  // 100 KB
+#define TEST_MAX_PROVENANCE_STORAGE_SIZE (100*1024*1024)  // 100 MB
+
+#define TEST_PROVENANCE_ENTRY_LIFE_TIME (1000)  // 1 sec
+
+void generateData(std::vector<char>& data) {
+  std::random_device rd;
+  std::mt19937 eng(rd());
+
+  std::uniform_int_distribution<> distr(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
+  auto rand = std::bind(distr, eng);
+  std::generate_n(data.begin(), data.size(), rand);
+}
+
+void provisionRepo(minifi::provenance::ProvenanceRepository& repo, size_t number_of_records, size_t record_size) {
+  for (int i = 0; i < number_of_records; ++i) {
+    std::vector<char> v(record_size);
+    generateData(v);
+    REQUIRE(repo.Put(std::to_string(i), reinterpret_cast<const uint8_t*>(v.data()), v.size()));
+  }
+}
+
+void verifyMaxKeyCount(const minifi::provenance::ProvenanceRepository& repo, uint64_t keyCount) {
+  uint64_t k = keyCount;
+
+  for (int i = 0; i < 50; ++i) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    k = std::min(k, repo.getKeyCount());
+    if (k < keyCount) {
+      break;
+    }
+  }
+
+  REQUIRE(k < keyCount);
+}
+
+TEST_CASE("Test size limit", "[sizeLimitTest]") {
+  TestController testController;
+
+  char dirtemplate[] = "/tmp/db.XXXXXX";
+  auto temp_dir = testController.createTempDirectory(dirtemplate);
+  REQUIRE(!temp_dir.empty());
+
+  // 60 sec, 100 KB - going to exceed the size limit
+  minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir,
+      MAX_PROVENANCE_ENTRY_LIFE_TIME, TEST_PROVENANCE_STORAGE_SIZE, 1000);
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, temp_dir);
+
+  REQUIRE(provdb.initialize(configuration));
+
+  uint64_t keyCount = 500;
+
+  provisionRepo(provdb, keyCount, 10240);
+
+  verifyMaxKeyCount(provdb, 200);
+}
+
+TEST_CASE("Test time limit", "[timeLimitTest]") {
+  TestController testController;
+
+  char dirtemplate[] = "/tmp/db.XXXXXX";
+  auto temp_dir = testController.createTempDirectory(dirtemplate);
+  REQUIRE(!temp_dir.empty());
+
+  // 1 sec, 100 MB - going to exceed TTL
+  minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir,
+                                                  TEST_PROVENANCE_ENTRY_LIFE_TIME, TEST_MAX_PROVENANCE_STORAGE_SIZE, 1000);
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, temp_dir);
+
+  REQUIRE(provdb.initialize(configuration));
+
+  uint64_t keyCount = 500;
+
+  provisionRepo(provdb, keyCount / 2, 102400);
+
+  REQUIRE(provdb.getKeyCount() == 250);
+
+  /**
+   * Magic: TTL-based DB cleanup only triggers when writeBuffers are serialized to storage
+   * To achieve this 250 entries are put to DB with a total size that ensures at least one buffer is serialized
+   * Wait more than a sec to make sure the serialized records expire
+   * Put another set of entries to trigger cleanup logic to drop the already serialized records
+   * This tests relies on the default settings of Provenance repo: a size of a writeBuffer is 16 MB
+   * One provisioning call here writes 25 MB to make sure serialization is triggered
+   * When the 2nd 50 MB is written the records of the 1st serialization are dropped -> around 160 of them
+   * That's why the final check verifies keyCount to be below 400
+   */
+  std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+  provisionRepo(provdb, keyCount /2, 102400);
+
+  verifyMaxKeyCount(provdb, 400);
+}