MINIFI-63 MINIFI-87 - Introduce minifi.sh script which serves as a wrapper around the minifi binary and provides installation as a service.
This closes #8 and closes #9.
diff --git a/.gitignore b/.gitignore
index c7dcff2..9d78a7f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
# Filter out generated files from the included libuuid
thirdparty/uuid/tst_uuid*
+assemblies
diff --git a/Makefile b/Makefile
index 40b11ff..2ce489b 100644
--- a/Makefile
+++ b/Makefile
@@ -80,6 +80,7 @@
minifi: $(BUILD_DIR)/$(TARGET_LIB) thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a
$(CC) $(CFLAGS) $(INCLUDES) -o $(BUILD_DIR)/$(TARGET_EXE) main/MiNiFiMain.cpp $(LDDIRECTORY) $(LDFLAGS)
cp $(BUILD_DIR)/$(TARGET_EXE) $(TARGET_DIR)/$(TARGET_EXE)
+ cp $(BUILD_DIR)/$(TARGET_EXE) bin/$(TARGET_EXE)
.PHONY: tests
tests: $(BUILD_DIR)/$(TARGET_LIB) thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a
@@ -100,6 +101,7 @@
inc \
src \
main \
+ bin \
conf \
thirdparty \
Makefile \
@@ -107,12 +109,15 @@
tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source.tar.gz -C $(ASSEMBLIES_DIR) $(PROJECT)-$(VERSION)-source
$(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz : $(ASSEMBLIES_DIR) $(TARGET_EXE)
- tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz \
- LICENSE \
+ mkdir -p $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin
+ cp -R LICENSE \
NOTICE \
README.md \
conf \
- -C target minifi
+ bin \
+ $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin
+ cp target/minifi $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin/bin/
+ tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz -C $(ASSEMBLIES_DIR) $(PROJECT)-$(VERSION)-bin
.PHONY: clean
clean:
diff --git a/bin/minifi.sh b/bin/minifi.sh
new file mode 100755
index 0000000..dca94f3
--- /dev/null
+++ b/bin/minifi.sh
@@ -0,0 +1,338 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
+
+SCRIPT_DIR=$(dirname "$0")
+SCRIPT_NAME=$(basename "$0")
+PROGNAME=$(basename "$0")
+SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )"
+export MINIFI_HOME="$(dirname ${SCRIPTPATH})"
+bin_dir=${MINIFI_HOME}/bin
+minifi_executable=${bin_dir}/minifi
+pid_file=${bin_dir}/.minifi.pid
+
+warn() {
+ echo "${PROGNAME}: $*"
+}
+
+die() {
+ warn "$*"
+ exit 1
+}
+
+detectOS() {
+ # OS specific support (must be 'true' or 'false').
+ cygwin=false;
+ aix=false;
+ os400=false;
+ darwin=false;
+ case "$(uname)" in
+ CYGWIN*)
+ cygwin=true
+ ;;
+ AIX*)
+ aix=true
+ ;;
+ OS400*)
+ os400=true
+ ;;
+ Darwin)
+ darwin=true
+ ;;
+ esac
+ # For AIX, set an environment variable
+ if ${aix}; then
+ export LDR_CNTRL=MAXDATA=0xB0000000@DSA
+ echo ${LDR_CNTRL}
+ fi
+}
+
+init() {
+ # Determine if there is special OS handling we must perform
+ detectOS
+}
+
+# determines the pid
+get_pid() {
+ # Default to a -1 for pid
+ pid=-1
+ # Check to see if we have a pid file
+ if [ -f ${pid_file} ]; then
+ pid=$(cat ${pid_file})
+ fi
+ echo ${pid}
+}
+
+# Performs a check to see if the provided pid is one that currently exists
+active_pid() {
+ pid=${1}
+ kill -s 0 ${pid} > /dev/null 2>&1
+ echo $?
+}
+
+install() {
+ detectOS
+
+ if [ "${darwin}" = "true" ] || [ "${cygwin}" = "true" ]; then
+ echo 'Installing Apache MiNiFi as a service is not supported on OS X or Cygwin.'
+ exit 1
+ fi
+
+ SVC_NAME=minifi
+ if [ "x$2" != "x" ] ; then
+ SVC_NAME=$2
+ fi
+
+ initd_dir='/etc/init.d'
+ SVC_FILE="${initd_dir}/${SVC_NAME}"
+
+ if [ ! -w "${initd_dir}" ]; then
+ echo "Current user does not have write permissions to ${initd_dir}. Cannot install MiNiFi as a service."
+ exit 1
+ fi
+
+# Create the init script, overwriting anything currently present
+cat <<SERVICEDESCRIPTOR > ${SVC_FILE}
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# chkconfig: 2345 20 80
+# description: Apache NiFi MiNiFi is a subproject of Apache nifi to collect data where it originates.
+#
+# Make use of the configured MINIFI_HOME directory and pass service requests to the minifi executable
+export MINIFI_HOME=${MINIFI_HOME}
+bin_dir=\${MINIFI_HOME}/bin
+minifi_executable=\${bin_dir}/minifi
+pid_file=${bin_dir}/.minifi.pid
+
+# determines the pid
+get_pid() {
+ # Default to a -1 for pid
+ pid=-1
+ # Check to see if we have a pid file
+ if [ -f \${pid_file} ]; then
+ pid=\$(cat ${pid_file})
+ fi
+ echo \${pid}
+}
+
+# Performs a check to see if the provided pid is one that currently exists
+active_pid() {
+ pid=\${1}
+ kill -s 0 \${pid} > /dev/null 2>&1
+ echo \$?
+}
+
+saved_pid=\$(get_pid)
+
+case "\$1" in
+ start)
+ if [ "\${saved_pid}" -gt 0 ]; then
+ if [ \$(active_pid \${saved_pid}) -ne 0 ]; then
+ echo "PID \${saved_pid} is stale, removing pid file at \${pid_file}";
+ if ! rm -f \${pid_file}; then
+ echo "Could not remove \${pid_file}. File will need to be manually removed."
+ exit 1;
+ fi
+ else
+ echo "MINIFI is currently running (PID: \${saved_pid}) with pid file \${pid_file}."
+ exit 0;
+ fi
+ fi
+ \${minifi_executable} &
+ pid=\$!
+ echo \${pid} > \${pid_file}
+ echo Starting MiNiFi with PID \${pid} and pid file \${pid_file}
+ ;;
+ stop)
+ if [ \$(active_pid \${saved_pid}) -ne 0 ]; then
+ echo "MiNiFi is not currently running."
+ else
+ echo "Stopping MiNiFi (PID: \${saved_pid})."
+ # Send a SIGINT to MiNiFi so that the handler begins shutdown.
+ kill -2 \${saved_pid} > /dev/null 2>&1
+ if [ \$? -ne 0 ]; then
+ echo "Could not successfully send termination signal to MiNiFi (PID: \${saved_pid})"
+ exit 1;
+ else
+ # Clean up our pid file
+ rm -f \${pid_file}
+ fi
+ fi
+ ;;
+ run)
+ if [ "\${saved_pid}" -gt 0 ]; then
+ if ! active_pid \${saved_pid}; then
+ echo "PID \${saved_pid} is stale, removing pid file at \${pid_file}";
+ if ! rm -f \${pid_file}; then
+ echo "Could not remove \${pid_file}. File will need to be manually removed."
+ exit 1;
+ fi
+ else
+ echo "MINIFI is currently running (PID: \${saved_pid}) with pid file \${pid_file}."
+ exit 0;
+ fi
+ fi
+ echo running
+ \${minifi_executable}
+ ;;
+ status)
+ # interpret status as per LSB specifications
+ # see: http://refspecs.linuxbase.org/LSB_3.1.0/LSB-Core-generic/LSB-Core-generic/iniscrptact.html
+
+ if [ "\${saved_pid}" -gt 0 ]; then
+ if [ \$(active_pid \${saved_pid}) -ne 0 ]; then
+ # program is dead and pid file exists
+ echo "Program is not currently running but stale pid file (\${pid_file}) exists.";
+ exit 1
+ else
+ # pid is correct, program is running
+ echo "MINIFI is currently running (PID: \${saved_pid}) with pid file \${pid_file}."
+ exit 0;
+ fi
+ else
+ # program is not running
+ echo "MiNiFi is not currently running."
+ exit 3;
+ fi
+ ;;
+ restart)
+ echo Restarting MiNiFi service
+ \${bin_dir}/minifi.sh stop
+ \${bin_dir}/minifi.sh start
+ ;;
+ *)
+ echo "Usage: service minifi {start|stop|restart|status}"
+ ;;
+esac
+
+SERVICEDESCRIPTOR
+
+ if [ ! -f "${SVC_FILE}" ]; then
+ echo "Could not create service file ${SVC_FILE}"
+ exit 1
+ fi
+
+ # Provide the user execute access on the file
+ chmod u+x ${SVC_FILE}
+
+ rm -f "/etc/rc2.d/S65${SVC_NAME}"
+ ln -s "/etc/init.d/${SVC_NAME}" "/etc/rc2.d/S65${SVC_NAME}" || { echo "Could not create link /etc/rc2.d/S65${SVC_NAME}"; exit 1; }
+ rm -f "/etc/rc2.d/K65${SVC_NAME}"
+ ln -s "/etc/init.d/${SVC_NAME}" "/etc/rc2.d/K65${SVC_NAME}" || { echo "Could not create link /etc/rc2.d/K65${SVC_NAME}"; exit 1; }
+ echo "Service ${SVC_NAME} installed"
+}
+
+saved_pid=$(get_pid)
+
+case "$1" in
+ start)
+ if [ "${saved_pid}" -gt 0 ]; then
+ if [ $(active_pid ${saved_pid}) -ne 0 ]; then
+ echo "PID ${saved_pid} is stale, removing pid file at ${pid_file}";
+ if ! rm -f ${pid_file}; then
+ echo "Could not remove ${pid_file}. File will need to be manually removed."
+ exit 1;
+ fi
+ else
+ echo "MINIFI is currently running (PID: ${saved_pid}) with pid file ${pid_file}."
+ exit 0;
+ fi
+ fi
+ ${minifi_executable} &
+ pid=$!
+ echo ${pid} > ${pid_file}
+ echo Starting MiNiFi with PID ${pid} and pid file ${pid_file}
+ ;;
+ stop)
+ if [ $(active_pid ${saved_pid}) -ne 0 ]; then
+ echo "MiNiFi is not currently running."
+ else
+ echo "Stopping MiNiFi (PID: ${saved_pid})."
+ # Send a SIGINT to MiNiFi so that the handler begins shutdown.
+ kill -2 ${saved_pid} > /dev/null 2>&1
+ if [ $? -ne 0 ]; then
+ echo "Could not successfully send termination signal to MiNiFi (PID: ${saved_pid})"
+ exit 1;
+ else
+ # Clean up our pid file
+ rm -f ${pid_file}
+ fi
+ fi
+ ;;
+ run)
+ if [ "${saved_pid}" -gt 0 ]; then
+ if [ $(active_pid ${saved_pid}) -ne 0 ]; then
+ echo "PID ${saved_pid} is stale, removing pid file at ${pid_file}";
+ if ! rm -f ${pid_file}; then
+ echo "Could not remove ${pid_file}. File will need to be manually removed."
+ exit 1;
+ fi
+ else
+ echo "MINIFI is currently running (PID: ${saved_pid}) with pid file ${pid_file}."
+ exit 0;
+ fi
+ fi
+ ${minifi_executable}
+ ;;
+ status)
+ # interpret status as per LSB specifications
+ # see: http://refspecs.linuxbase.org/LSB_3.1.0/LSB-Core-generic/LSB-Core-generic/iniscrptact.html
+
+ if [ "${saved_pid}" -gt 0 ]; then
+ if [ $(active_pid ${saved_pid}) -ne 0 ]; then
+ # program is dead and pid file exists
+ echo "Program is not currently running but stale pid file (${pid_file}) exists.";
+ exit 1
+ else
+ # pid is correct, program is running
+ echo "MINIFI is currently running (PID: ${saved_pid}) with pid file ${pid_file}."
+ exit 0;
+ fi
+ else
+ # program is not running
+ echo "MiNiFi is not currently running."
+ exit 3;
+ fi
+ ;;
+ restart)
+ echo Restarting MiNiFi service
+ ${bin_dir}/minifi.sh stop
+ ${bin_dir}/minifi.sh start
+ ;;
+ install)
+ install "$@"
+ ;;
+ *)
+ echo "Usage: minifi.sh {start|stop|run|restart|status|install}"
+ ;;
+esac
diff --git a/inc/Configure.h b/inc/Configure.h
index 502916f..d325fa0 100644
--- a/inc/Configure.h
+++ b/inc/Configure.h
@@ -76,14 +76,28 @@
void parseConfigureFileLine(char *buf);
//! Load Configure File
void loadConfigureFile(const char *fileName);
- //! Parse Command Line
- void pareCommandLine(int argc, char **argv);
+ //! Set the determined MINIFI_HOME
+ void setHome(std::string minifiHome)
+ {
+ _minifiHome = minifiHome;
+ }
+
+ //! Get the determined MINIFI_HOME
+ std::string getHome()
+ {
+ return _minifiHome;
+ }
+ //! Parse Command Line
+ void parseCommandLine(int argc, char **argv);
private:
//! Mutex for protection
std::mutex _mtx;
//! Logger
Logger *_logger;
+ //! Home location for this executable
+ std::string _minifiHome;
+
Configure()
{
_logger = Logger::getLogger();
diff --git a/inc/Connection.h b/inc/Connection.h
index 919cdc9..dc6b94b 100644
--- a/inc/Connection.h
+++ b/inc/Connection.h
@@ -102,8 +102,6 @@
}
//! Set Connection relationship
void setRelationship(Relationship relationship) {
- _logger->log_debug("Set connection %s relationship %s",
- _name.c_str(), relationship.getName().c_str());
_relationship = relationship;
}
// ! Get Connection relationship
diff --git a/inc/Logger.h b/inc/Logger.h
index 42cf3ea..3edad9d 100644
--- a/inc/Logger.h
+++ b/inc/Logger.h
@@ -38,8 +38,8 @@
#define DEFAULT_LOG_FILE_SIZE (5*1024*1024)
//! 3 log files rotation
#define DEFAULT_LOG_FILE_NUMBER 3
-#define LOG_NAME "nifi"
-#define LOG_FILE_NAME "nifi"
+#define LOG_NAME "minifi log"
+#define LOG_FILE_NAME "minifi-app.log"
typedef enum
{
@@ -141,11 +141,7 @@
* Create a logger
* */
Logger(const std::string logger_name = LOG_NAME, const std::string filename = LOG_FILE_NAME, size_t max_file_size = DEFAULT_LOG_FILE_SIZE, size_t max_files = DEFAULT_LOG_FILE_NUMBER, bool force_flush = true) {
- /*
- if (!filename.empty())
- _spdlog = rotating_logger_mt(logger_name, filename, max_file_size, max_files, force_flush);
- else */
- _spdlog = stdout_logger_mt("console");
+ _spdlog = rotating_logger_mt(logger_name, filename, max_file_size, max_files, force_flush);
_spdlog->set_level((spdlog::level::level_enum) debug);
}
//! spdlog
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 4506af1..bf394b7 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -24,6 +24,8 @@
#include <map>
#include <unistd.h>
#include <yaml-cpp/yaml.h>
+#include <iostream>
+#include "spdlog/spdlog.h"
#include "Logger.h"
#include "Configure.h"
@@ -37,6 +39,8 @@
#define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml"
//! Default nifi properties file path
#define DEFAULT_NIFI_PROPERTIES_FILE "./conf/minifi.properties"
+//! Define home environment variable
+#define MINIFI_HOME_ENV_KEY "MINIFI_HOME"
/* Define Parser Values for Configuration YAML sections */
#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
@@ -59,64 +63,49 @@
}
}
-int loadYaml() {
- YAML::Node flow = YAML::LoadFile("./conf/flow.yml");
-
- YAML::Node flowControllerNode = flow[CONFIG_YAML_FLOW_CONTROLLER_KEY];
- YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
- YAML::Node connectionsNode = flow[CONFIG_YAML_CONNECTIONS_KEY];
- YAML::Node remoteProcessingGroupNode = flow[CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY];
-
- if (processorsNode) {
- int numProcessors = processorsNode.size();
- if (numProcessors < 1) {
- throw new std::invalid_argument("There must be at least one processor configured.");
- }
-
- std::vector<ProcessorConfig> processorConfigs;
-
- if (processorsNode.IsSequence()) {
- for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
- ProcessorConfig procCfg;
- YAML::Node procNode = iter->as<YAML::Node>();
-
- procCfg.name = procNode["name"].as<std::string>();
- procCfg.javaClass = procNode["class"].as<std::string>();
-
- processorConfigs.push_back(procCfg);
- }
- }
-
- Logger::getLogger()->log_info("Added %d processor configs.", processorConfigs.size());
- } else {
- throw new std::invalid_argument(
- "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+int main(int argc, char **argv)
+{
+ try
+ {
+ std::vector<spdlog::sink_ptr> sinks;
+ sinks.push_back(std::make_shared<spdlog::sinks::stdout_sink_st>());
+ sinks.push_back(std::make_shared<spdlog::sinks::daily_file_sink_st>("logfile", "log", 23, 59));
+ auto combined_logger = std::make_shared<spdlog::logger>("name", begin(sinks), end(sinks));
+ spdlog::register_logger(combined_logger);
}
-
- return 0;
-}
-
-int main(int argc, char **argv) {
+ catch (const spdlog::spdlog_ex& ex)
+ {
+ std::cout << "Log failed: " << ex.what() << std::endl;
+ }
Logger *logger = Logger::getLogger();
logger->setLogLevel(info);
- logger->log_info("MiNiFi started");
- try {
- logger->log_info("Performing parsing of specified config.yml");
- loadYaml();
- } catch (...) {
- std::cout << "Could not load YAML due to improper configuration.";
- return 1;
- }
+ // assumes POSIX compliant environment
+ std::string minifiHome;
+ if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY))
+ {
+ minifiHome = env_p;
+ }
+ else
+ {
+ logger->log_info("MINIFI_HOME was not found, determining based on executable path.");
+ char *path = NULL;
+ char full_path[PATH_MAX];
+ path = realpath(argv[0], full_path);
+ std::string minifiHome(path);
+ minifiHome = minifiHome.substr(0, minifiHome.find_last_of("/\\"));
+ }
- if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
+ if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ {
logger->log_error("Can not install signal handler");
return -1;
}
- Configure *configure = Configure::getConfigure();
- configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
+ Configure *configure = Configure::getConfigure();
+ configure->setHome(minifiHome);
+ configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
controller = new FlowController();
@@ -126,6 +115,8 @@
controller->start();
running = true;
+ logger->log_info("MiNiFi started");
+
// main loop
while (running)
{
diff --git a/src/Configure.cpp b/src/Configure.cpp
index 862bb71..d7fd95b 100644
--- a/src/Configure.cpp
+++ b/src/Configure.cpp
@@ -107,23 +107,42 @@
//! Load Configure File
void Configure::loadConfigureFile(const char *fileName)
{
- std::ifstream file(fileName, std::ifstream::in);
- if (!file.good())
- {
- _logger->log_error("load configure file failed %s", fileName);
- return;
- }
- this->clear();
- const unsigned int bufSize = 512;
- char buf[bufSize];
- for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize))
- {
- parseConfigureFileLine(buf);
- }
+
+ std::string adjustedFilename;
+ if (fileName)
+ {
+ // perform a naive determination if this is a relative path
+ if (fileName[0] != '/')
+ {
+ adjustedFilename = adjustedFilename + _configure->getHome() + "/" + fileName;
+ }
+ else
+ {
+ adjustedFilename += fileName;
+ }
+ }
+ char *path = NULL;
+ char full_path[PATH_MAX];
+ path = realpath(adjustedFilename.c_str(), full_path);
+ _logger->log_info("Using configuration file located at %s", path);
+
+ std::ifstream file(path, std::ifstream::in);
+ if (!file.good())
+ {
+ _logger->log_error("load configure file failed %s", path);
+ return;
+ }
+ this->clear();
+ const unsigned int bufSize = 512;
+ char buf[bufSize];
+ for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize))
+ {
+ parseConfigureFileLine(buf);
+ }
}
//! Parse Command Line
-void Configure::pareCommandLine(int argc, char **argv)
+void Configure::parseCommandLine(int argc, char **argv)
{
int i;
bool keyFound = false;
diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp
index 6f1517c..011ebcf 100644
--- a/src/FlowControlProtocol.cpp
+++ b/src/FlowControlProtocol.cpp
@@ -231,7 +231,6 @@
return;
_running = false;
_logger->log_info("FlowControl Protocol Stop");
- delete _thread;
}
void FlowControlProtocol::run(FlowControlProtocol *protocol)
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index c01c385..b176a12 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -37,7 +37,7 @@
uuid_generate(_uuid);
// Setup the default values
- _configurationFileName = DEFAULT_FLOW_XML_FILE_NAME;
+ _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
_maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
_maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
_running = false;
@@ -48,10 +48,46 @@
// NiFi config properties
_configure = Configure::getConfigure();
- _configure->get(Configure::nifi_flow_configuration_file, _configurationFileName);
- _logger->log_info("FlowController NiFi XML file %s", _configurationFileName.c_str());
- // Create repos for flow record and provenance
+ std::string rawConfigFileString;
+ _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
+
+ if (!rawConfigFileString.empty())
+ {
+ _configurationFileName = rawConfigFileString;
+ }
+
+ char *path = NULL;
+ char full_path[PATH_MAX];
+
+ std::string adjustedFilename;
+ if (!_configurationFileName.empty())
+ {
+ // perform a naive determination if this is a relative path
+ if (_configurationFileName.c_str()[0] != '/')
+ {
+ adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName;
+ }
+ else
+ {
+ adjustedFilename = _configurationFileName;
+ }
+ }
+
+ path = realpath(adjustedFilename.c_str(), full_path);
+ if (!path)
+ {
+ _logger->log_error("Could not locate path from provided configuration file name.");
+ }
+
+ char *flowPath = NULL;
+ char flow_full_path[PATH_MAX];
+
+ std::string pathString(path);
+ _configurationFileName = pathString;
+ _logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
+
+ // Create repos for flow record and provenance
_logger->log_info("FlowController %s created", _name.c_str());
}
@@ -419,15 +455,11 @@
std::vector<std::string> rawAutoTerminatedRelationshipValues;
if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull()
&& autoTerminatedSequence.size() > 0) {
- _logger->log_debug("Found non-empty auto terminated sequence... interpreting.");
for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
relIter != autoTerminatedSequence.end(); ++relIter) {
std::string autoTerminatedRel = relIter->as<std::string>();
- _logger->log_debug("Auto terminating relationship %s", autoTerminatedRel.c_str());
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
}
- } else {
- _logger->log_debug("no relationships are auto terminated here...");
}
procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
@@ -604,7 +636,7 @@
if (connectionsNode->IsSequence()) {
for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
-// generate the random UIID
+ // generate the random UIID
uuid_generate(uuid);
YAML::Node connectionNode = iter->as<YAML::Node>();
@@ -623,10 +655,9 @@
if (connection)
connection->setRelationship(relationship);
std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
+
Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName);
- _logger->log_debug("I see processor with name %s looking for source with name %s",
- this->_root->findProcessor(connectionSrcProcName)->getName().c_str(),
- connectionSrcProcName.c_str());
+
if (!srcProcessor) {
_logger->log_error("Could not locate a source with name %s to create a connection",
connectionSrcProcName.c_str());
@@ -634,24 +665,15 @@
"Could not locate a source with name %s to create a connection " + connectionSrcProcName);
}
- _logger->log_debug("This processor has UUID of %s", srcProcessor->getUUIDStr().c_str());
- _logger->log_trace("Trying to find dest processor by name %s", destName.c_str());
Processor *destProcessor = this->_root->findProcessor(destName);
// If we could not find name, try by UUID
if (!destProcessor) {
- _logger->log_trace("Now looking up by uuid");
uuid_t destUuid;
uuid_parse(destName.c_str(), destUuid);
destProcessor = this->_root->findProcessor(destUuid);
}
if (destProcessor) {
std::string destUuid = destProcessor->getUUIDStr();
- if (!destUuid.empty()) {
- _logger->log_debug("This destination processor has UUID of %s",
- destProcessor->getUUIDStr().c_str());
- }
- } else {
- _logger->log_debug("!!! === Could not find a destination processor for the connection.");
}
uuid_t srcUuid;
@@ -787,8 +809,6 @@
Processor *processor = NULL;
RemoteProcessorGroupPort *port = NULL;
- _logger->log_trace("Creating a port from YAML.");
-
if (!parent) {
_logger->log_error("parseProcessNode: no parent group existed");
return;
@@ -796,7 +816,7 @@
YAML::Node inputPortsObj = portNode->as<YAML::Node>();
-// generate the random UIID
+ // generate the random UIID
uuid_generate(uuid);
auto portId = inputPortsObj["id"].as<std::string>();
@@ -804,8 +824,6 @@
uuid_parse(portId.c_str(), uuid);
port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
- _logger->log_debug("parse input port: name => [%s]", nameStr.c_str());
- _logger->log_debug("parse input port: id => [%s]", portId.c_str());
processor = (Processor *) port;
port->setDirection(direction);
@@ -819,23 +837,17 @@
YAML::Node propertiesNode = nodeVal["Properties"];
std::vector<Property> properties;
- _logger->log_debug("!!! === Checking out properties for input port....");
-
if (propertiesNode.IsMap() && !propertiesNode.IsNull() && propertiesNode.size() > 0) {
std::map<std::string, std::string> propertiesMap = propertiesNode.as<std::map<std::string, std::string>>();
- _logger->log_debug("Found non-empty properties sequence... interpreting.");
for (std::map<std::string, std::string>::iterator propsIter = propertiesMap.begin();
propsIter != propertiesMap.end(); propsIter++) {
std::string propertyName = propsIter->first;
std::string propertyValue = propsIter->second;
- _logger->log_debug("Detected property %s => %s", propertyName.c_str(), propertyValue.c_str());
if (!processor->setProperty(propertyName, propertyValue)) {
_logger->log_warn("Received property %s with value %s but is not one of the properties for %s",
propertyName.c_str(), propertyValue.c_str(), nameStr.c_str());
}
}
- } else {
- _logger->log_debug("no properties here...");
}
// add processor to parent
@@ -1149,8 +1161,6 @@
xmlCleanupParser();
_initialized = true;
} else if (ConfigFormat::YAML == configFormat) {
- _logger->log_info("Detected a YAML configuration file for processing.");
-
YAML::Node flow = YAML::LoadFile(_configurationFileName);
YAML::Node flowControllerNode = flow["Flow Controller"];
diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp
index 090c988..ace37d7 100644
--- a/src/ListenSyslog.cpp
+++ b/src/ListenSyslog.cpp
@@ -331,9 +331,6 @@
}
else
{
- /*
- ListenSyslog::WriteCallback callbackSep((char *)_messageDelimiter.data(), _messageDelimiter.size());
- session->append(flowFile, &callbackSep); */
ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
session->append(flowFile, &callback);
delete[] event.payload;