blob: b3205d531e27ca290c58621646d304df05f15714 [file] [log] [blame]
/**
* @file MiNiFiMain.cpp
* MiNiFiMain implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <Windows.h>
#include "MiNiFiWindowsService.h"
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "legacy_stdio_definitions.lib")
#ifdef ENABLE_JNI
#pragma comment(lib, "jvm.lib")
#endif
#include <direct.h>
#endif
#include <fcntl.h>
#include <stdio.h>
#include <cstdlib>
#include <semaphore.h>
#include <signal.h>
#include <vector>
#include <queue>
#include <map>
#include <iostream>
#include <utils/file/FileUtils.h>
#include "ResourceClaim.h"
#include "core/Core.h"
#include "core/FlowConfiguration.h"
#include "core/ConfigurationFactory.h"
#include "core/RepositoryFactory.h"
#include "utils/file/PathUtils.h"
#include "utils/file/FileUtils.h"
#include "utils/Environment.h"
#include "FlowController.h"
#include "AgentDocs.h"
#include "MainHelper.h"
// Variables that allow us to avoid a timed wait.
sem_t *running;
//! Flow Controller
/**
* Removed the stop command from the signal handler so that we could trigger
* unload after we exit the semaphore controlled critical section in main.
*
* Semaphores are a portable choice when using signal handlers. Threads,
* mutexes, and condition variables are not guaranteed to work within
* a signal handler. Consequently we will use the semaphore to avoid thread
* safety issues.
*/
#ifdef WIN32
BOOL WINAPI consoleSignalHandler(DWORD signal) {
if (signal == CTRL_C_EVENT || signal == CTRL_BREAK_EVENT)
{
sem_post(running);
if (sem_wait(running) == -1)
perror("sem_wait");
}
return TRUE;
}
void SignalExitProcess() {
sem_post(running);
}
#endif
void sigHandler(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
// avoid stopping the controller here.
sem_post(running);
}
}
void dumpDocs(const std::shared_ptr<minifi::Configure> &configuration, const std::string &dir, std::ostream &out) {
auto pythoncreator = core::ClassLoader::getDefaultClassLoader().instantiate("PythonCreator", "PythonCreator");
if (nullptr != pythoncreator) {
pythoncreator->configure(configuration);
}
minifi::docs::AgentDocs docsCreator;
docsCreator.generate(dir, out);
}
int main(int argc, char **argv) {
#ifdef WIN32
RunAsServiceIfNeeded();
bool isStartedByService = false;
HANDLE terminationEventHandler = GetTerminationEventHandle(&isStartedByService);
if (terminationEventHandler == nullptr) {
return -1;
}
utils::Environment::setRunningAsService(isStartedByService);
#endif
if (utils::Environment::isRunningAsService()) {
setSyslogLogger();
}
std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("main");
#ifdef WIN32
if (isStartedByService) {
if (!CreateServiceTerminationThread(logger, terminationEventHandler)) {
return -1;
}
} else {
CloseHandle(terminationEventHandler);
}
#endif
uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
// initialize static functions that were defined apriori
core::FlowConfiguration::initialize_static_functions();
std::string graceful_shutdown_seconds = "";
std::string prov_repo_class = "provenancerepository";
std::string flow_repo_class = "flowfilerepository";
std::string nifi_configuration_class_name = "yamlconfiguration";
std::string content_repo_class = "filesystemrepository";
running = sem_open("/MiNiFiMain", O_CREAT, 0644, 0);
if (running == SEM_FAILED || running == 0) {
logger->log_error("could not initialize semaphore");
perror("initialization failure");
}
#ifdef WIN32
if (!SetConsoleCtrlHandler(consoleSignalHandler, TRUE)) {
logger->log_error("Cannot install signal handler");
return -1;
}
if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR ) {
logger->log_error("Cannot install signal handler");
return -1;
}
#ifdef SIGBREAK
if (signal(SIGBREAK, sigHandler) == SIG_ERR) {
logger->log_error("Cannot install signal handler");
return -1;
}
#endif
#else
if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
logger->log_error("Cannot install signal handler");
return -1;
}
#endif
// Determine MINIFI_HOME
const std::string minifiHome = determineMinifiHome(logger);
if (minifiHome.empty()) {
// determineMinifiHome already logged everything we need
return -1;
}
// chdir to MINIFI_HOME
if (!utils::Environment::setCurrentWorkingDirectory(minifiHome.c_str())) {
logger->log_error("Failed to change working directory to MINIFI_HOME (%s)", minifiHome);
return -1;
}
std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>();
log_properties->setHome(minifiHome);
log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
std::shared_ptr<minifi::Properties> uid_properties = std::make_shared<minifi::Properties>("UID properties");
uid_properties->setHome(minifiHome);
uid_properties->loadConfigureFile(DEFAULT_UID_PROPERTIES_FILE);
utils::IdGenerator::getIdGenerator()->initialize(uid_properties);
// Make a record of minifi home in the configured log file.
logger->log_info("MINIFI_HOME=%s", minifiHome);
std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>();
configure->setHome(minifiHome);
configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
if (argc >= 3 && std::string("docs") == argv[1]) {
if (utils::file::FileUtils::create_dir(argv[2]) != 0) {
std::cerr << "Working directory doesn't exist and cannot be created: " << argv[2] << std::endl;
exit(1);
}
std::cerr << "Dumping docs to " << argv[2] << std::endl;
if (argc == 4) {
std::string filepath, filename;
utils::file::PathUtils::getFileNameAndPath(argv[3], filepath, filename);
if (filepath == argv[2]) {
std::cerr << "Target file should be out of the working directory: " << filepath << std::endl;
exit(1);
}
std::ofstream outref(argv[3]);
dumpDocs(configure, argv[2], outref);
} else{
dumpDocs(configure, argv[2], std::cout);
}
exit(0);
}
if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, graceful_shutdown_seconds)) {
try {
stop_wait_time = std::stoi(graceful_shutdown_seconds);
}
catch (const std::out_of_range &e) {
logger->log_error("%s is out of range. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
}
catch (const std::invalid_argument &e) {
logger->log_error("%s contains an invalid argument set. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
}
}
else {
logger->log_debug("%s not set, defaulting to %d", minifi::Configure::nifi_graceful_shutdown_seconds,
STOP_WAIT_TIME_MS);
}
configure->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class);
// Create repos for flow record and provenance
std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance");
if (!prov_repo->initialize(configure)) {
logger->log_error("Provenance repository failed to initialize, exiting..");
exit(1);
}
configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class);
std::shared_ptr<core::Repository> flow_repo = core::createRepository(flow_repo_class, true, "flowfile");
if (!flow_repo->initialize(configure)) {
logger->log_error("Flow file repository failed to initialize, exiting..");
exit(1);
}
configure->get(minifi::Configure::nifi_content_repository_class_name, content_repo_class);
std::shared_ptr<core::ContentRepository> content_repo = core::createContentRepository(content_repo_class, true, "content");
if (!content_repo->initialize(configure)) {
logger->log_error("Content repository failed to initialize, exiting..");
exit(1);
}
std::string content_repo_path;
if (configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_path)) {
logging::LOG_INFO(logger) << "setting default dir to " << content_repo_path;
minifi::setDefaultDirectory(content_repo_path);
}
configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configure);
std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name);
std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>(
new minifi::FlowController(prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo));
logger->log_info("Loading FlowController");
// Load flow from specified configuration file
try {
controller->load();
}
catch (std::exception &e) {
logger->log_error("Failed to load configuration due to exception: %s", e.what());
return -1;
}
catch (...) {
logger->log_error("Failed to load configuration due to unknown exception");
return -1;
}
// Start Processing the flow
controller->start();
logger->log_info("MiNiFi started");
/**
* Sem wait provides us the ability to have a controlled
* yield without the need for a more complex construct and
* a spin lock
*/
int ret_val;
while ((ret_val = sem_wait(running)) == -1 && errno == EINTR);
if(ret_val == -1) perror("sem_wait");
while ((ret_val = sem_close(running)) == -1 && errno == EINTR);
if(ret_val == -1) perror("sem_close");
while ((ret_val = sem_unlink("/MiNiFiMain")) == -1 && errno == EINTR);
if(ret_val == -1) perror("sem_unlink");
/**
* Trigger unload -- wait stop_wait_time
*/
controller->waitUnload(stop_wait_time);
controller->stopC2();
flow_repo = nullptr;
prov_repo = nullptr;
logger->log_info("MiNiFi exit");
#ifdef WIN32
sem_post(running);
#endif
return 0;
}