blob: 4a8535815774484c9e23d64125c5cf7db0604a96 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
#include <cinttypes>
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::controllers {
AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
: PersistableKeyValueStoreService(std::move(name), uuid)
, always_persist_(false)
, auto_persistence_interval_(0U)
, running_(false) {
}
AbstractAutoPersistingKeyValueStoreService::~AbstractAutoPersistingKeyValueStoreService() {
stopPersistingThread();
}
void AbstractAutoPersistingKeyValueStoreService::stopPersistingThread() {
std::unique_lock<std::mutex> lock(persisting_mutex_);
if (persisting_thread_.joinable()) {
running_ = false;
persisting_cv_.notify_one();
lock.unlock();
persisting_thread_.join();
}
}
void AbstractAutoPersistingKeyValueStoreService::onEnable() {
std::unique_lock<std::mutex> lock(persisting_mutex_);
if (configuration_ == nullptr) {
logger_->log_debug("Cannot enable AbstractAutoPersistingKeyValueStoreService");
return;
}
std::string value;
if (!getProperty(AlwaysPersistPropertyName, value)) {
logger_->log_error("Always Persist attribute is missing or invalid");
} else {
always_persist_ = utils::StringUtils::toBool(value).value_or(false);
}
core::TimePeriodValue auto_persistence_interval;
if (!getProperty(AutoPersistenceIntervalPropertyName, auto_persistence_interval)) {
logger_->log_error("Auto Persistence Interval attribute is missing or invalid");
} else {
auto_persistence_interval_ = auto_persistence_interval.getMilliseconds();
}
if (!always_persist_ && auto_persistence_interval_ != 0s) {
if (!persisting_thread_.joinable()) {
logger_->log_trace("Starting auto persistence thread");
running_ = true;
persisting_thread_ = std::thread(&AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc, this);
}
}
logger_->log_trace("Enabled AbstractAutoPersistingKeyValueStoreService");
}
void AbstractAutoPersistingKeyValueStoreService::notifyStop() {
stopPersistingThread();
}
void AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc() {
std::unique_lock<std::mutex> lock(persisting_mutex_);
while (true) {
logger_->log_trace("Persisting thread is going to sleep for %" PRId64 " ms", int64_t{auto_persistence_interval_.count()});
persisting_cv_.wait_for(lock, auto_persistence_interval_, [this] {
return !running_;
});
if (!running_) {
logger_->log_trace("Stopping persistence thread");
return;
}
persist();
}
}
} // namespace org::apache::nifi::minifi::controllers