blob: c48c4b62c651d93ac3f26fa604e6543c4dbbfd95 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <utility>
#include "OpenRocksDb.h"
#include "ColumnHandle.h"
#include "RocksDbInstance.h"
namespace org::apache::nifi::minifi::internal {
OpenRocksDb::OpenRocksDb(RocksDbInstance& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl, gsl::not_null<std::shared_ptr<ColumnHandle>> column)
: db_(&db), impl_(std::move(impl)), column_(std::move(column)) {}
rocksdb::Status OpenRocksDb::Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
rocksdb::Status result = impl_->Put(options, column_->handle.get(), key, value);
handleResult(result);
return result;
}
rocksdb::Status OpenRocksDb::Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value) {
rocksdb::Status result = impl_->Get(options, column_->handle.get(), key, value);
handleResult(result);
return result;
}
std::vector<rocksdb::Status> OpenRocksDb::MultiGet(const rocksdb::ReadOptions& options, const std::vector<rocksdb::Slice>& keys, std::vector<std::string>* values) {
std::vector<rocksdb::Status> results = impl_->MultiGet(
options, std::vector<rocksdb::ColumnFamilyHandle*>(keys.size(), column_->handle.get()), keys, values);
handleResult(results);
return results;
}
rocksdb::Status OpenRocksDb::Write(const rocksdb::WriteOptions& options, internal::WriteBatch* updates) {
rocksdb::Status result = impl_->Write(options, &updates->impl_);
handleResult(result);
return result;
}
rocksdb::Status OpenRocksDb::Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key) {
rocksdb::Status result = impl_->Delete(options, column_->handle.get(), key);
handleResult(result);
return result;
}
rocksdb::Status OpenRocksDb::Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
rocksdb::Status result = impl_->Merge(options, column_->handle.get(), key, value);
handleResult(result);
return result;
}
bool OpenRocksDb::GetProperty(const rocksdb::Slice& property, std::string* value) {
return impl_->GetProperty(column_->handle.get(), property, value);
}
std::unique_ptr<rocksdb::Iterator> OpenRocksDb::NewIterator(const rocksdb::ReadOptions& options) {
return std::unique_ptr<rocksdb::Iterator>{impl_->NewIterator(options, column_->handle.get())};
}
rocksdb::Status OpenRocksDb::NewCheckpoint(rocksdb::Checkpoint **checkpoint) {
return rocksdb::Checkpoint::Create(impl_.get(), checkpoint);
}
rocksdb::Status OpenRocksDb::NewCheckpoint(std::unique_ptr<rocksdb::Checkpoint>& checkpoint) {
rocksdb::Checkpoint* checkpoint_ptr = nullptr;
rocksdb::Status result = NewCheckpoint(&checkpoint_ptr);
if (result.ok()) {
checkpoint.reset(checkpoint_ptr);
}
return result;
}
rocksdb::Status OpenRocksDb::FlushWAL(bool sync) {
rocksdb::Status result = impl_->FlushWAL(sync);
handleResult(result);
return result;
}
rocksdb::Status OpenRocksDb::RunCompaction() {
rocksdb::Status result = impl_->CompactRange(rocksdb::CompactRangeOptions{
.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce
}, nullptr, nullptr);
handleResult(result);
return result;
}
void OpenRocksDb::handleResult(const rocksdb::Status& result) {
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
}
}
void OpenRocksDb::handleResult(const std::vector<rocksdb::Status>& results) {
for (const auto& result : results) {
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
break;
}
}
}
WriteBatch OpenRocksDb::createWriteBatch() const noexcept {
return WriteBatch(column_->handle.get());
}
rocksdb::DB* OpenRocksDb::get() {
return impl_.get();
}
std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const {
const rocksdb::SizeApproximationOptions options{ .include_memtables = true };
std::string del_char_str(1, static_cast<char>(127));
std::string empty_str;
const rocksdb::Range range(empty_str, del_char_str);
uint64_t value = 0;
auto status = impl_->GetApproximateSizes(options, column_->handle.get(), &range, 1, &value);
if (status.ok()) {
return value;
}
return std::nullopt;
}
minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
minifi::core::RepositoryMetricsSource::RocksDbStats stats;
std::string table_readers;
GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
try {
stats.table_readers_size = std::stoull(table_readers);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!");
}
std::string all_memtables;
GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
try {
stats.all_memory_tables_size = std::stoull(all_memtables);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!");
}
return stats;
}
} // namespace org::apache::nifi::minifi::internal