blob: 290b2c6e51ea87b59723763426c879ebb75faccc [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RocksDatabase.h"
#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace internal {
OpenRocksDB::OpenRocksDB(RocksDatabase& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl) : db_(&db), impl_(std::move(impl)) {}
rocksdb::Status OpenRocksDB::Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
rocksdb::Status result = impl_->Put(options, key, value);
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
}
return result;
}
rocksdb::Status OpenRocksDB::Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value) {
rocksdb::Status result = impl_->Get(options, key, value);
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
}
return result;
}
std::vector<rocksdb::Status> OpenRocksDB::MultiGet(const rocksdb::ReadOptions& options, const std::vector<rocksdb::Slice>& keys, std::vector<std::string>* values) {
std::vector<rocksdb::Status> results = impl_->MultiGet(options, keys, values);
for (const auto& result : results) {
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
break;
}
}
return results;
}
rocksdb::Status OpenRocksDB::Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates) {
rocksdb::Status result = impl_->Write(options, updates);
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
}
return result;
}
rocksdb::Status OpenRocksDB::Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key) {
rocksdb::Status result = impl_->Delete(options, key);
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
}
return result;
}
rocksdb::Status OpenRocksDB::Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
rocksdb::Status result = impl_->Merge(options, key, value);
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
}
return result;
}
bool OpenRocksDB::GetProperty(const rocksdb::Slice& property, std::string* value) {
return impl_->GetProperty(property, value);
}
std::unique_ptr<rocksdb::Iterator> OpenRocksDB::NewIterator(const rocksdb::ReadOptions& options) {
return std::unique_ptr<rocksdb::Iterator>{impl_->NewIterator(options)};
}
rocksdb::Status OpenRocksDB::NewCheckpoint(rocksdb::Checkpoint **checkpoint) {
return rocksdb::Checkpoint::Create(impl_.get(), checkpoint);
}
rocksdb::Status OpenRocksDB::FlushWAL(bool sync) {
rocksdb::Status result = impl_->FlushWAL(sync);
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
}
return result;
}
rocksdb::DB* OpenRocksDB::get() {
return impl_.get();
}
std::shared_ptr<core::logging::Logger> RocksDatabase::logger_ = core::logging::LoggerFactory<RocksDatabase>::getLogger();
RocksDatabase::RocksDatabase(const rocksdb::Options& options, const std::string& name, Mode mode) : open_options_(options), db_name_(name), mode_(mode) {}
void RocksDatabase::invalidate() {
std::lock_guard<std::mutex> db_guard{ mtx_ };
// discard our own instance
impl_.reset();
}
utils::optional<OpenRocksDB> RocksDatabase::open() {
std::lock_guard<std::mutex> db_guard{ mtx_ };
if (!impl_) {
// database is not opened yet
rocksdb::DB* db_instance = nullptr;
rocksdb::Status result;
switch (mode_) {
case Mode::ReadWrite:
result = rocksdb::DB::Open(open_options_, db_name_, &db_instance);
if (!result.ok()) {
logger_->log_error("Cannot open writable rocksdb database %s, error: %s", db_name_, result.ToString());
}
break;
case Mode::ReadOnly:
result = rocksdb::DB::OpenForReadOnly(open_options_, db_name_, &db_instance);
if (!result.ok()) {
logger_->log_error("Cannot open read-only rocksdb database %s, error: %s", db_name_, result.ToString());
}
break;
}
if (db_instance != nullptr && result.ok()) {
impl_.reset(db_instance);
} else {
// we failed to open the database
return utils::nullopt;
}
}
return OpenRocksDB(*this, gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(impl_));
}
} /* namespace internal */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */