blob: bdcc3e57c9ac49f27b9ee33b20b531497a568230 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DatabaseContentRepository.h"
#include <memory>
#include <string>
#include "RocksDbStream.h"
#include "rocksdb/merge_operator.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace repository {
bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) {
std::string value;
if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) {
directory_ = value;
} else {
directory_ = configuration->getHome() + "/dbcontentrepository";
}
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
options.merge_operator = std::make_shared<StringAppender>();
options.error_if_exists = false;
options.max_successive_merges = 0;
rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
if (status.ok()) {
logger_->log_debug("NiFi Content DB Repository database open %s success", directory_);
is_valid_ = true;
} else {
logger_->log_error("NiFi Content DB Repository database open %s fail", directory_);
is_valid_ = false;
}
return is_valid_;
}
void DatabaseContentRepository::stop() {
if (db_) {
db_->FlushWAL(true);
delete db_;
db_ = nullptr;
}
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_)
return nullptr;
// append is already supported in all modes
return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, true);
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_)
return nullptr;
return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, false);
}
bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
std::string value;
rocksdb::Status status;
status = db_->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
if (status.ok()) {
logger_->log_debug("%s exists", streamId->getContentFullPath());
return true;
} else {
logger_->log_debug("%s does not exist", streamId->getContentFullPath());
return false;
}
}
bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
if (nullptr == claim || !is_valid_ || !db_)
return false;
rocksdb::Status status;
status = db_->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
if (status.ok()) {
logger_->log_debug("Deleted %s", claim->getContentFullPath());
return true;
} else {
logger_->log_debug("Attempted, but could not delete %s", claim->getContentFullPath());
return false;
}
}
} /* namespace repository */
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */