blob: f4ea89e8b76062f3a2bb5ac8a04ad1872b152c42 [file] [log] [blame]
/**
* @file MiNiFiMain.cpp
* MiNiFiMain implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fcntl.h>
#include <stdio.h>
#include <semaphore.h>
#include <signal.h>
#include <vector>
#include <queue>
#include <map>
#include <unistd.h>
#include <yaml-cpp/yaml.h>
#include <iostream>
#include "core/Core.h"
#include "core/logging/BaseLogger.h"
#include "core/logging/LogAppenders.h"
#include "spdlog/spdlog.h"
#include "core/FlowConfiguration.h"
#include "core/ConfigurationFactory.h"
#include "core/RepositoryFactory.h"
#include "core/logging/Logger.h"
#include "properties/Configure.h"
#include "FlowController.h"
//! Main thread sleep interval 1 second
#define SLEEP_INTERVAL 1
//! Main thread stop wait time
#define STOP_WAIT_TIME_MS 30*1000
//! Default YAML location
#define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml"
//! Default nifi properties file path
#define DEFAULT_NIFI_PROPERTIES_FILE "./conf/minifi.properties"
//! Define home environment variable
#define MINIFI_HOME_ENV_KEY "MINIFI_HOME"
/* Define Parser Values for Configuration YAML sections */
#define CONFIG_YAML_PROCESSORS_KEY "Processors"
#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
#define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups"
// Variables that allow us to avoid a timed wait.
sem_t *running;
//! Flow Controller
static std::unique_ptr<minifi::FlowController> controller = nullptr;
/**
* Removed the stop command from the signal handler so that we could trigger
* unload after we exit the semaphore controlled critical section in main.
*
* Semaphores are a portable choice when using signal handlers. Threads,
* mutexes, and condition variables are not guaranteed to work within
* a signal handler. Consequently we will use the semaphore to avoid thread
* safety issues and.
*/
void sigHandler(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
// avoid stopping the controller here.
sem_post(running);
}
}
int main(int argc, char **argv) {
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
logger->setLogLevel(logging::info);
uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
std::string graceful_shutdown_seconds = "";
std::string prov_repo_class = "provenancerepository";
std::string flow_repo_class = "flowfilerepository";
std::string nifi_configuration_class_name = "yamlconfiguration";
running = sem_open("MiNiFiMain", O_CREAT, 0644, 0);
if (running == SEM_FAILED || running == 0) {
logger->log_error("could not initialize semaphore");
perror("initialization failure");
}
// assumes POSIX compliant environment
std::string minifiHome;
if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY)) {
minifiHome = env_p;
} else {
logger->log_info(
"MINIFI_HOME was not found, determining based on executable path.");
char *path = NULL;
char full_path[PATH_MAX];
path = realpath(argv[0], full_path);
std::string minifiHomePath(path);
minifiHomePath = minifiHomePath.substr(0,
minifiHomePath.find_last_of("/\\")); //Remove /minifi from path
minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\")); //Remove /bin from path
}
if (signal(SIGINT, sigHandler) == SIG_ERR
|| signal(SIGTERM, sigHandler) == SIG_ERR
|| signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
logger->log_error("Can not install signal handler");
return -1;
}
minifi::Configure *configure = minifi::Configure::getConfigure();
configure->setHome(minifiHome);
configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds,
graceful_shutdown_seconds)) {
try {
stop_wait_time = std::stoi(graceful_shutdown_seconds);
} catch (const std::out_of_range &e) {
logger->log_error("%s is out of range. %s",
minifi::Configure::nifi_graceful_shutdown_seconds,
e.what());
} catch (const std::invalid_argument &e) {
logger->log_error("%s contains an invalid argument set. %s",
minifi::Configure::nifi_graceful_shutdown_seconds,
e.what());
}
} else {
logger->log_debug("%s not set, defaulting to %d",
minifi::Configure::nifi_graceful_shutdown_seconds,
STOP_WAIT_TIME_MS);
}
std::string log_level;
if (configure->get(minifi::Configure::nifi_log_level,
log_level)) {
logger->setLogLevel(log_level);
}
// set the log configuration.
std::unique_ptr<logging::BaseLogger> configured_logger =
logging::LogInstance::getConfiguredLogger(configure);
logger->updateLogger(std::move(configured_logger));
configure->get(minifi::Configure::nifi_provenance_repository_class_name,
prov_repo_class);
// Create repos for flow record and provenance
std::shared_ptr<core::Repository> prov_repo = core::createRepository(
prov_repo_class, true);
prov_repo->initialize();
configure->get(minifi::Configure::nifi_flow_repository_class_name,
flow_repo_class);
std::shared_ptr<core::Repository> flow_repo = core::createRepository(
flow_repo_class, true);
flow_repo->initialize();
configure->get(minifi::Configure::nifi_configuration_class_name,
nifi_configuration_class_name);
std::unique_ptr<core::FlowConfiguration> flow_configuration = std::move(
core::createFlowConfiguration(prov_repo, flow_repo,
nifi_configuration_class_name));
controller = std::unique_ptr<minifi::FlowController>(
new minifi::FlowController(prov_repo, flow_repo,
std::move(flow_configuration)));
// Load flow from specified configuration file
controller->load();
// Start Processing the flow
controller->start();
logger->log_info("MiNiFi started");
/**
* Sem wait provides us the ability to have a controlled
* yield without the need for a more complex construct and
* a spin lock
*/
if (sem_wait(running) != -1)
perror("sem_wait");
sem_unlink("MiNiFiMain");
/**
* Trigger unload -- wait stop_wait_time
*/
controller->waitUnload(stop_wait_time);
logger->log_info("MiNiFi exit");
return 0;
}