blob: 8b430dce8a6ec45bfd5aaf8f155225c9c968c5a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <thread>
#include <vector>
#include "core/ContentRepository.h"
#include "core/BufferedContentSession.h"
#include "core/logging/LoggerFactory.h"
#include "core/Property.h"
#include "database/RocksDatabase.h"
#include "properties/Configure.h"
#include "utils/StoppableThread.h"
namespace org::apache::nifi::minifi::core::repository {
class DatabaseContentRepository : public core::ContentRepositoryImpl {
class Session : public BufferedContentSession {
public:
explicit Session(std::shared_ptr<ContentRepository> repository, bool use_synchronous_writes);
void commit() override;
private:
bool use_synchronous_writes_;
};
static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2};
public:
static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.database.content.repository.encryption.key";
explicit DatabaseContentRepository(std::string_view name = className<DatabaseContentRepository>(), const utils::Identifier& uuid = {})
: core::ContentRepositoryImpl(name, uuid),
is_valid_(false),
db_(nullptr),
logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) {
}
~DatabaseContentRepository() override {
stop();
}
EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
std::shared_ptr<ContentSession> createSession() override;
bool initialize(const std::shared_ptr<minifi::Configure> &configuration) override;
std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false) override;
std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override;
bool close(const minifi::ResourceClaim &claim) override {
return remove(claim);
}
bool exists(const minifi::ResourceClaim &streamId) override;
void clearOrphans() override;
void start() override;
void stop() override;
uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
std::optional<RepositoryMetricsSource::RocksDbStats> getRocksDbStats() const override;
private:
void runGc();
protected:
bool removeKeySync(const std::string& content_path);
bool removeKey(const std::string& content_path) override;
std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch);
void runCompaction();
void setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration);
bool is_valid_;
std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::shared_ptr<logging::Logger> logger_;
std::chrono::milliseconds compaction_period_{DEFAULT_COMPACTION_PERIOD};
std::unique_ptr<utils::StoppableThread> compaction_thread_;
std::chrono::milliseconds purge_period_{std::chrono::seconds{1}};
std::mutex keys_mtx_;
std::vector<std::string> keys_to_delete_;
std::unique_ptr<utils::StoppableThread> gc_thread_;
bool use_synchronous_writes_ = true;
bool verify_checksums_in_rocksdb_reads_ = false;
};
} // namespace org::apache::nifi::minifi::core::repository