blob: ef446be8269fe530bf6ed0124e1d305974afb372 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "core/Repository.h"
#include "core/Core.h"
#include "provenance/Provenance.h"
#include "core/logging/LoggerConfiguration.h"
#include "concurrentqueue.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace provenance {
#define PROVENANCE_DIRECTORY "./provenance_repository"
#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
public:
ProvenanceRepository(std::string name, utils::Identifier uuid)
: ProvenanceRepository(name){
}
// Constructor
/*!
* Create a new provenance repository
*/
ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
MAX_PROVENANCE_STORAGE_SIZE,
uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod),
logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) {
db_ = NULL;
}
// Destructor
virtual ~ProvenanceRepository() {
if (db_)
delete db_;
}
virtual void flush();
virtual bool isNoop() {
return false;
}
void start() {
if (this->purge_period_ <= 0)
return;
if (running_)
return;
running_ = true;
thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
logger_->log_debug("%s Repository Monitor Thread Start", name_);
}
// initialize
virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
std::string value;
if (config->get(Configure::nifi_provenance_repository_directory_default, value)) {
directory_ = value;
}
logger_->log_debug("MiNiFi Provenance Repository Directory %s", directory_);
if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
core::Property::StringToInt(value, max_partition_bytes_);
}
logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
}
}
logger_->log_debug("MiNiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
if (status.ok()) {
logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_);
} else {
logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString());
return false;
}
return true;
}
// Put
virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
if (repo_full_) {
return false;
}
// persist to the DB
rocksdb::Slice value((const char *) buf, bufLen);
return db_->Put(rocksdb::WriteOptions(), key, value).ok();
}
virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
if (repo_full_) {
return false;
}
rocksdb::WriteBatch batch;
for (const auto &item: data) {
rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
if (!batch.Put(item.first, value).ok()) {
return false;
}
}
return db_->Write(rocksdb::WriteOptions(), &batch).ok();
}
// Delete
virtual bool Delete(std::string key) {
keys_to_delete.enqueue(key);
return true;
}
// Get
virtual bool Get(const std::string &key, std::string &value) {
return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
}
// Remove event
void removeEvent(ProvenanceEventRecord *event) {
Delete(event->getEventId());
}
virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
return Put(key, buffer, bufferSize);
}
virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
std::string key = it->key().ToString();
if (store.size() >= max_size)
break;
if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
}
}
delete it;
return true;
}
virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
size_t requested_batch = max_size;
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (max_size >= requested_batch)
break;
std::shared_ptr<core::SerializableComponent> eventRead = lambda();
std::string key = it->key().ToString();
if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
max_size++;
records.push_back(eventRead);
}
}
delete it;
return max_size > 0;
}
//! get record
void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
std::string key = it->key().ToString();
if (records.size() >= (uint64_t)maxSize)
break;
if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
records.push_back(eventRead);
}
}
delete it;
}
virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
std::string key = it->key().ToString();
if (store.at(max_size)->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
max_size++;
}
if (store.size() >= max_size)
break;
}
delete it;
return max_size > 0;
}
//! purge record
void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
for (auto record : records) {
Delete(record->getEventId());
}
flush();
}
// destroy
void destroy() {
if (db_) {
delete db_;
db_ = NULL;
}
}
// Run function for the thread
void run();
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceRepository(const ProvenanceRepository &parent) = delete;
ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
private:
moodycamel::ConcurrentQueue<std::string> keys_to_delete;
rocksdb::DB* db_;
std::shared_ptr<logging::Logger> logger_;
};
} /* namespace provenance */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */