blob: 1a163eae6f587689299f81dc7264aad11fa57bb1 [file] [log] [blame]
/**
* @file FlowController.cpp
* FlowController class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FlowController.h"
#include <sys/time.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <chrono>
#include <future>
#include <thread>
#include <utility>
#include <memory>
#include <string>
#include "core/ProcessContext.h"
#include "core/ProcessGroup.h"
#include "utils/StringUtils.h"
#include "core/Core.h"
#include "core/repository/FlowFileRepository.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
#define DEFAULT_CONFIG_NAME "conf/flow.yml"
FlowController::FlowController(
std::shared_ptr<core::Repository> provenance_repo,
std::shared_ptr<core::Repository> flow_file_repo,
std::unique_ptr<core::FlowConfiguration> flow_configuration,
const std::string name, bool headless_mode)
: CoreComponent(core::getClassName<FlowController>()),
root_(nullptr),
max_timer_driven_threads_(0),
max_event_driven_threads_(0),
running_(false),
initialized_(false),
provenance_repo_(provenance_repo),
flow_file_repo_(flow_file_repo),
protocol_(0),
_timerScheduler(provenance_repo_),
_eventScheduler(provenance_repo_),
flow_configuration_(std::move(flow_configuration)) {
if (provenance_repo == nullptr)
throw std::runtime_error("Provenance Repo should not be null");
if (flow_file_repo == nullptr)
throw std::runtime_error("Flow Repo should not be null");
uuid_generate(uuid_);
setUUID(uuid_);
// Setup the default values
if (flow_configuration_ != nullptr) {
configuration_filename_ = flow_configuration_->getConfigurationPath();
}
max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
running_ = false;
initialized_ = false;
root_ = NULL;
protocol_ = new FlowControlProtocol(this);
// NiFi config properties
configure_ = Configure::getConfigure();
if (!headless_mode) {
std::string rawConfigFileString;
configure_->get(Configure::nifi_flow_configuration_file,
rawConfigFileString);
if (!rawConfigFileString.empty()) {
configuration_filename_ = rawConfigFileString;
}
std::string adjustedFilename;
if (!configuration_filename_.empty()) {
// perform a naive determination if this is a relative path
if (configuration_filename_.c_str()[0] != '/') {
adjustedFilename = adjustedFilename + configure_->getHome() + "/"
+ configuration_filename_;
} else {
adjustedFilename = configuration_filename_;
}
}
initializePaths(adjustedFilename);
}
}
void FlowController::initializePaths(const std::string &adjustedFilename) {
char *path = NULL;
char full_path[PATH_MAX];
path = realpath(adjustedFilename.c_str(), full_path);
if (path == NULL) {
throw std::runtime_error(
"Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
}
std::string pathString(path);
configuration_filename_ = pathString;
logger_->log_info("FlowController NiFi Configuration file %s",
pathString.c_str());
// Create the content repo directory if needed
struct stat contentDirStat;
if (stat(ResourceClaim::default_directory_path, &contentDirStat)
!= -1&& S_ISDIR(contentDirStat.st_mode)) {
path = realpath(ResourceClaim::default_directory_path, full_path);
logger_->log_info("FlowController content directory %s", full_path);
} else {
if (mkdir(ResourceClaim::default_directory_path, 0777) == -1) {
logger_->log_error("FlowController content directory creation failed");
exit(1);
}
}
std::string clientAuthStr;
if (!path) {
logger_->log_error(
"Could not locate path from provided configuration file name (%s). Exiting.",
full_path);
exit(1);
}
}
FlowController::~FlowController() {
stop(true);
unload();
if (NULL != protocol_)
delete protocol_;
flow_file_repo_ = nullptr;
provenance_repo_ = nullptr;
}
void FlowController::stop(bool force) {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
// immediately indicate that we are not running
running_ = false;
logger_->log_info("Stop Flow Controller");
this->_timerScheduler.stop();
this->_eventScheduler.stop();
this->flow_file_repo_->stop();
this->provenance_repo_->stop();
// Wait for sometime for thread stop
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (this->root_)
this->root_->stopProcessing(&this->_timerScheduler,
&this->_eventScheduler);
}
}
/**
* This function will attempt to unload yaml and stop running Processors.
*
* If the latter attempt fails or does not complete within the prescribed
* period, running_ will be set to false and we will return.
*
* @param timeToWaitMs Maximum time to wait before manually
* marking running as false.
*/
void FlowController::waitUnload(const uint64_t timeToWaitMs) {
if (running_) {
// use the current time and increment with the provided argument.
std::chrono::system_clock::time_point wait_time =
std::chrono::system_clock::now()
+ std::chrono::milliseconds(timeToWaitMs);
// create an asynchronous future.
std::future<void> unload_task = std::async(std::launch::async,
[this]() {unload();});
if (std::future_status::ready == unload_task.wait_until(wait_time)) {
running_ = false;
}
}
}
void FlowController::unload() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
stop(true);
}
if (initialized_) {
logger_->log_info("Unload Flow Controller");
root_ = nullptr;
initialized_ = false;
name_ = "";
}
return;
}
void FlowController::load() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
stop(true);
}
if (!initialized_) {
logger_->log_info("Load Flow Controller from file %s",
configuration_filename_.c_str());
this->root_ = flow_configuration_->getRoot(configuration_filename_);
// Load Flow File from Repo
loadFlowRepo();
initialized_ = true;
}
}
void FlowController::reload(std::string yamlFile) {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
logger_->log_info("Starting to reload Flow Controller with yaml %s",
yamlFile.c_str());
stop(true);
unload();
std::string oldYamlFile = this->configuration_filename_;
this->configuration_filename_ = yamlFile;
load();
start();
if (this->root_ != nullptr) {
this->configuration_filename_ = oldYamlFile;
logger_->log_info("Rollback Flow Controller to YAML %s",
oldYamlFile.c_str());
stop(true);
unload();
load();
start();
}
}
void FlowController::loadFlowRepo() {
if (this->flow_file_repo_) {
std::map<std::string, std::shared_ptr<Connection>> connectionMap;
if (this->root_ != nullptr) {
this->root_->getConnections(connectionMap);
}
auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>(
flow_file_repo_);
rep->setConnectionMap(connectionMap);
flow_file_repo_->loadComponent();
}
}
bool FlowController::start() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (!initialized_) {
logger_->log_error(
"Can not start Flow Controller because it has not been initialized");
return false;
} else {
if (!running_) {
logger_->log_info("Starting Flow Controller");
this->_timerScheduler.start();
this->_eventScheduler.start();
if (this->root_ != nullptr) {
this->root_->startProcessing(&this->_timerScheduler,
&this->_eventScheduler);
}
running_ = true;
this->protocol_->start();
this->provenance_repo_->start();
this->flow_file_repo_->start();
logger_->log_info("Started Flow Controller");
}
return true;
}
}
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */