blob: 9b5571fc0b0d6d0d8d996ee0f5a6d1315c4eeb05 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ProvenanceRepository.h"
#include <string>
#include "core/Resource.h"
namespace org::apache::nifi::minifi::provenance {
bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
std::string value;
if (config->get(Configure::nifi_provenance_repository_directory_default, value) && !value.empty()) {
directory_ = value;
}
logger_->log_debug("MiNiFi Provenance Repository Directory {}", directory_);
if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
max_partition_bytes_ = gsl::narrow<int64_t>(parsing::parseDataSize(value) | utils::orThrow("expected parsable data size"));
}
logger_->log_debug("MiNiFi Provenance Max Partition Bytes {}", max_partition_bytes_);
if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
if (auto max_partition = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value))
max_partition_millis_ = *max_partition;
}
logger_->log_debug("MiNiFi Provenance Max Storage Time: [{}]", max_partition_millis_);
verify_checksums_in_rocksdb_reads_ = (config->get(Configure::nifi_provenance_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in ProvenanceRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");
auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
minifi::internal::setCommonRocksDbOptions(db_opts);
};
// Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after
// This shouldn't go above 16MB and the configured total size of the db should cap it as well
auto cf_options = [this] (rocksdb::ColumnFamilyOptions& cf_opts) {
int64_t max_buffer_size = 16 << 20;
cf_opts.write_buffer_size = gsl::narrow<size_t>(std::min(max_buffer_size, max_partition_bytes_));
cf_opts.max_write_buffer_number = 4;
cf_opts.min_write_buffer_number_to_merge = 1;
cf_opts.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
cf_opts.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
if (max_partition_millis_ > std::chrono::milliseconds(0)) {
cf_opts.ttl = std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count();
}
};
db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_,
minifi::internal::getRocksDbOptionsToOverride(config, Configure::nifi_provenance_repository_rocksdb_options));
if (db_->open()) {
logger_->log_debug("MiNiFi Provenance Repository database open {} success", directory_);
} else {
logger_->log_error("MiNiFi Provenance Repository database open {} failed", directory_);
return false;
}
return true;
}
bool ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) {
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
std::unique_ptr<rocksdb::Iterator> it(opendb->NewIterator(options));
size_t requested_batch = max_size;
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (max_size >= requested_batch)
break;
auto eventRead = ProvenanceEventRecord::create();
std::string key = it->key().ToString();
io::BufferStream stream(gsl::make_span(it->value()).as_span<const std::byte>());
if (eventRead->deserialize(stream)) {
max_size++;
records.push_back(eventRead);
}
}
return max_size > 0;
}
void ProvenanceRepository::destroy() {
db_.reset();
}
REGISTER_RESOURCE_AS(ProvenanceRepository, InternalResource, ("ProvenanceRepository", "provenancerepository"));
} // namespace org::apache::nifi::minifi::provenance