MINIFICPP-927 Add delimited tailfile processor
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #613
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 348725e..c3bec9f 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -46,7 +46,7 @@
*/
virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
- virtual std::string getStoragePath() {
+ virtual std::string getStoragePath() const {
return directory_;
}
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 424e306..be8ed91 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -59,7 +59,9 @@
flow_repo_(flow_repo),
content_repo_(content_repo),
processor_node_(processor),
- logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+ logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
+ configure_(std::make_shared<minifi::Configure>()),
+ initialized_(false) {
repo_ = repo;
}
@@ -75,7 +77,8 @@
flow_repo_(flow_repo),
content_repo_(content_repo),
processor_node_(processor),
- logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+ logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
+ initialized_(false) {
repo_ = repo;
}
// Destructor
@@ -197,6 +200,15 @@
return controller_service_provider_->getControllerServiceName(identifier);
}
+ void initializeContentRepository(const std::string& home) {
+ configure_->setHome(home);
+ content_repo_->initialize(configure_);
+ initialized_ = true;
+ }
+
+ bool isInitialized() const {
+ return initialized_;
+ }
private:
template<typename T>
@@ -217,7 +229,9 @@
// Logger
std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<Configure> configure_;
+ bool initialized_;
};
} /* namespace core */
diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h
index 65e0414..a2a2e78 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -41,7 +41,7 @@
}
- virtual std::string getStoragePath() = 0;
+ virtual std::string getStoragePath() const = 0;
/**
* Create a write stream using the streamId as a reference.
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 91fc130..bedfee8 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -282,7 +282,15 @@
}
#endif
- static int create_dir(const std::string &path, bool create = true) {
+ static int is_directory(const char * path) {
+ struct stat dir_stat;
+ if (stat(path, &dir_stat) < 0) {
+ return 0;
+ }
+ return S_ISDIR(dir_stat.st_mode);
+ }
+
+ static int create_dir(const std::string& path, bool recursive = true) {
#ifdef BOOST_VERSION
boost::filesystem::path dir(path);
if(boost::filesystem::create_directory(dir))
@@ -301,12 +309,41 @@
return 0;
}
#else
- struct stat dir_stat;
- if (stat(path.c_str(), &dir_stat)) {
- if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) {
+ if (!recursive) {
+ if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) {
+ return -1;
+ }
+ return 0;
+ }
+
+ int ret = mkdir(path.c_str(), 0700);
+ if (ret == 0) {
+ return 0;
+ }
+
+ switch (errno) {
+ case ENOENT: {
+ size_t found = path.find_last_of(get_separator(0));
+
+ if (found == std::string::npos) {
+ return -1;
+ }
+
+ const std::string dir = path.substr(0, found);
+ int res = create_dir(dir);
+ if (res < 0) {
+ return -1;
+ }
+ return mkdir(path.c_str(), 0700);
+ }
+ case EEXIST: {
+ if (is_directory(path.c_str())) {
+ return 0;
+ }
return -1;
- }
- return 0;
+ }
+ default:
+ return -1;
}
#endif
return -1;
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index 4607d74..4f9b83d 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -34,7 +34,7 @@
if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) {
directory_ = value;
} else {
- directory_ = configuration->getHome() + "/contentrepository";
+ directory_ = configuration->getHome();
}
utils::file::FileUtils::create_dir(directory_);
return true;
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index 9e4e1e8..7110519 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -33,9 +33,11 @@
include_directories(../libminifi/opsys/posix)
endif()
-file(GLOB NANOFI_SOURCES "src/api/*.cpp" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*")
+file(GLOB NANOFI_SOURCES "src/api/*.c*" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*")
-file(GLOB NANOFI_EXAMPLES_SOURCES "examples/*.c" )
+if(WIN32)
+list(REMOVE_ITEM NANOFI_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/api/ecu.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/file_utils.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/flowfiles.c)
+endif()
file(GLOB NANOFI_ECU_SOURCES "ecu/*.c")
diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt
index b28af76..fccb443 100644
--- a/nanofi/ecu/CMakeLists.txt
+++ b/nanofi/ecu/CMakeLists.txt
@@ -19,33 +19,6 @@
cmake_minimum_required(VERSION 2.6)
-IF(POLICY CMP0048)
- CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
-
-include_directories(/include)
-
-include(CheckCXXCompilerFlag)
-if (WIN32)
- if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
- CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
- if (_cpp_latest_flag_supported)
- add_compile_options("/std:c++14")
- endif()
- endif()
-else()
-CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
-CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
-if(COMPILER_SUPPORTS_CXX11)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
-elseif(COMPILER_SUPPORTS_CXX0X)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
-else()
- message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
-endif()
-
-endif()
-
if (APPLE)
set(LINK_FLAGS "-Wl,-all_load")
set(LINK_END_FLAGS "")
@@ -56,8 +29,16 @@
if (NOT WIN32)
-add_executable(tail_file tail_file.c)
+add_executable(log_aggregator log_aggregator.c)
-target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+target_link_libraries(log_aggregator nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(tailfile_chunk tailfile_chunk.c)
+
+target_link_libraries(tailfile_chunk nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(tailfile_delimited tailfile_delimited.c)
+
+target_link_libraries(tailfile_delimited nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
endif()
\ No newline at end of file
diff --git a/nanofi/ecu/log_aggregator.c b/nanofi/ecu/log_aggregator.c
new file mode 100644
index 0000000..52d4f23
--- /dev/null
+++ b/nanofi/ecu/log_aggregator.c
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+int main(int argc, char** argv) {
+
+ if (argc < 7) {
+ printf("Error: must run ./log_aggregator <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n");
+ exit(1);
+ }
+
+ tailfile_input_params input_params = init_logaggregate_input(argv);
+
+ uint64_t intrvl = 0;
+ uint64_t port_num = 0;
+ if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+ return 1;
+ }
+
+ setup_signal_action();
+ nifi_proc_params params = setup_nifi_processor(&input_params, "LogAggregator", on_trigger_logaggregator);
+
+ set_standalone_property(params.processor, "file_path", input_params.file);
+ set_standalone_property(params.processor, "delimiter", input_params.delimiter);
+
+ struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+ char uuid_str[37];
+ get_proc_uuid_from_processor(params.processor, uuid_str);
+
+ while (!stopped) {
+ flow_file_record * new_ff = invoke(params.processor);
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid_str, pp);
+ if (pp) {
+ transmit_payload(client, pp->ff_list, 0);
+ delete_all_flow_files_from_proc(uuid_str);
+ }
+ free_flowfile(new_ff);
+ sleep(intrvl);
+ }
+
+ printf("log aggregator processor stopped\n");
+ if (client) {
+ destroyClient(client);
+ }
+ clear_content_repo(params.instance);
+ delete_all_flow_files_from_proc(uuid_str);
+ free_standalone_processor(params.processor);
+ free_instance(params.instance);
+ free_proc_params(uuid_str);
+ return 0;
+}
diff --git a/nanofi/ecu/tail_file.c b/nanofi/ecu/tail_file.c
deleted file mode 100644
index c428be1..0000000
--- a/nanofi/ecu/tail_file.c
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-#include "api/nanofi.h"
-#include "core/string_utils.h"
-#include "core/cstructs.h"
-#include "core/file_utils.h"
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <limits.h>
-#include <signal.h>
-#include <sys/stat.h>
-
-struct flow_file_records * flowfiles = NULL;
-nifi_instance * instance = NULL;
-standalone_processor * proc = NULL;
-int file_offset = 0;
-int stopped = 0;
-flow_file_list ff_list;
-token_list tks;
-
-void signal_handler(int signum) {
- if (signum == SIGINT || signum == SIGTERM) {
- stopped = 1;
- }
-}
-
-void transmit_flow_files(nifi_instance * instance) {
- flow_file_list_node * head = ff_list.head;
- while (head) {
- transmit_flowfile(head->ff_record, instance);
- head = head->next;
- }
-}
-
-void set_offset(int offset) {
- file_offset = offset;
-}
-
-int get_offset() {
- return file_offset;
-}
-
-void on_trigger_callback(processor_session * ps, processor_context * ctx) {
-
- char file_path[4096];
- char delimiter[3];
-
- if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
- return;
- }
-
- if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
- return;
- }
-
- if (strlen(delimiter) == 0) {
- printf("Delimiter not specified or it is empty\n");
- return;
- }
- char delim = delimiter[0];
-
- if (delim == '\\') {
- if (strlen(delimiter) > 1) {
- switch (delimiter[1]) {
- case 'r':
- delim = '\r';
- break;
- case 't':
- delim = '\t';
- break;
- case 'n':
- delim = '\n';
- break;
- case '\\':
- delim = '\\';
- break;
- default:
- break;
- }
- }
- }
-
- tks = tail_file(file_path, delim, get_offset());
-
- if (!validate_list(&tks)) return;
-
- set_offset(get_offset() + tks.total_bytes);
-
- token_node * head;
- for (head = tks.head; head && head->data; head = head->next) {
- flow_file_record * ffr = generate_flow_file(instance, proc);
- const char * flow_file_path = ffr->contentLocation;
- FILE * ffp = fopen(flow_file_path, "wb");
- if (!ffp) {
- printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
- break;
- }
-
- int count = strlen(head->data);
- int ret = fwrite(head->data, 1, count, ffp);
- if (ret < count) {
- fclose(ffp);
- break;
- }
- fseek(ffp, 0, SEEK_END);
- ffr->size = ftell(ffp);
- fclose(ffp);
- add_flow_file_record(&ff_list, ffr);
- }
- free_all_tokens(&tks);
-}
-
-int main(int argc, char** argv) {
-
- if (argc < 6) {
- printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
- exit(1);
- }
-
- char * file = argv[1];
- char * interval = argv[2];
- char * delimiter = argv[3];
- char * instance_str = argv[4];
- char * port_str = argv[5];
-
- if (access(file, F_OK) == -1) {
- printf("Error: %s doesn't exist!\n", file);
- exit(1);
- }
-
- struct stat stats;
- errno = 0;
- int ret = stat(file, &stats);
-
- if (ret == -1) {
- printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
- exit(1);
- }
- // Check for file existence
- if (S_ISDIR(stats.st_mode)){
- printf("Error: %s is a directory!\n", file);
- exit(1);
- }
-
- errno = 0;
- unsigned long intrvl = strtol(interval, NULL, 10);
-
- if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
- printf("Invalid interval value specified\n");
- return 0;
- }
-
- struct sigaction action;
- memset(&action, 0, sizeof(sigaction));
- action.sa_handler = signal_handler;
- sigaction(SIGTERM, &action, NULL);
- sigaction(SIGINT, &action, NULL);
-
- nifi_port port;
-
- port.port_id = port_str;
-
- instance = create_instance(instance_str, &port);
-
- const char * processor_name = "TailFile";
-
- add_custom_processor(processor_name, on_trigger_callback);
-
- proc = create_processor(processor_name);
-
- set_standalone_property(proc, "file_path", file);
- set_standalone_property(proc, "delimiter", delimiter);
-
- set_offset(0);
- while (!stopped) {
- flow_file_record * new_ff = invoke(proc);
- transmit_flow_files(instance);
- free_flow_file_list(&ff_list);
- free_flowfile(new_ff);
- sleep(intrvl);
- }
-
- printf("tail file processor stopped\n");
- free_standalone_processor(proc);
- free(instance);
-
- return 0;
-}
diff --git a/nanofi/ecu/tailfile_chunk.c b/nanofi/ecu/tailfile_chunk.c
new file mode 100644
index 0000000..26b9966
--- /dev/null
+++ b/nanofi/ecu/tailfile_chunk.c
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include <unistd.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+int main(int argc, char** argv) {
+
+ if (argc < 7) {
+ printf("Error: must run ./tailfile_chunk <file> <interval> <chunksize> <hostname> <tcp port number> <nifi port uuid>\n");
+ exit(1);
+ }
+
+ tailfile_input_params input_params = init_tailfile_chunk_input(argv);
+
+ uint64_t intrvl = 0;
+ uint64_t port_num = 0;
+ if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+ return 1;
+ }
+
+ setup_signal_action();
+ nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileChunk", on_trigger_tailfilechunk);
+
+ set_standalone_property(params.processor, "file_path", input_params.file);
+ set_standalone_property(params.processor, "chunk_size", input_params.chunk_size);
+
+ struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+ char uuid_str[37];
+ get_proc_uuid_from_processor(params.processor, uuid_str);
+
+ while (!stopped) {
+ flow_file_record * new_ff = invoke(params.processor);
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid_str, pp);
+ if (pp) {
+ transmit_payload(client, pp->ff_list, 0);
+ delete_all_flow_files_from_proc(uuid_str);
+ }
+ free_flowfile(new_ff);
+ sleep(intrvl);
+ }
+
+ printf("processor stopped\n");
+ if (client) {
+ destroyClient(client);
+ }
+ clear_content_repo(params.instance);
+ delete_all_flow_files_from_proc(uuid_str);
+ free_standalone_processor(params.processor);
+ free_instance(params.instance);
+ free_proc_params(uuid_str);
+ return 0;
+}
diff --git a/nanofi/ecu/tailfile_delimited.c b/nanofi/ecu/tailfile_delimited.c
new file mode 100644
index 0000000..3495231
--- /dev/null
+++ b/nanofi/ecu/tailfile_delimited.c
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include <unistd.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+int main(int argc, char** argv) {
+
+ if (argc < 7) {
+ printf("Error: must run ./tailfile_delimited <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n");
+ exit(1);
+ }
+
+ tailfile_input_params input_params = init_logaggregate_input(argv);
+
+ uint64_t intrvl = 0;
+ uint64_t port_num = 0;
+ if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+ return 1;
+ }
+
+ setup_signal_action();
+ nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileDelimited", on_trigger_tailfiledelimited);
+
+ set_standalone_property(params.processor, "file_path", input_params.file);
+ set_standalone_property(params.processor, "delimiter", input_params.delimiter);
+
+ struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+ char uuid_str[37];
+ get_proc_uuid_from_processor(params.processor, uuid_str);
+
+ while (!stopped) {
+ flow_file_record * new_ff = invoke(params.processor);
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid_str, pp);
+ if (pp) {
+ transmit_payload(client, pp->ff_list, 1);
+ delete_completed_flow_files_from_proc(uuid_str);
+ }
+ free_flowfile(new_ff);
+ sleep(intrvl);
+ }
+
+ printf("tailfile delimited processor stopped\n");
+ if (client) {
+ destroyClient(client);
+ }
+ clear_content_repo(params.instance);
+ delete_all_flow_files_from_proc(uuid_str);
+ free_standalone_processor(params.processor);
+ free_instance(params.instance);
+ free_proc_params(uuid_str);
+ return 0;
+}
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index b9480ed..6a9779c 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -83,8 +83,4 @@
target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
-#add_executable(tail_file tail_file.c)
-
-#target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
-
endif()
diff --git a/nanofi/examples/tail_file.c b/nanofi/examples/tail_file.c
deleted file mode 100644
index 2200df4..0000000
--- a/nanofi/examples/tail_file.c
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-#include "api/nanofi.h"
-#include "core/string_utils.h"
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <limits.h>
-#include <signal.h>
-#include <sys/stat.h>
-
-typedef struct flow_file_records {
- flow_file_record ** records;
- uint64_t len;
-} flow_file_records;
-
-struct flow_file_records * flowfiles = NULL;
-nifi_instance * instance = NULL;
-standalone_processor * proc = NULL;
-int file_offset = 0;
-int stopped = 0;
-
-void signal_handler(int signum) {
- if (signum == SIGINT || signum == SIGTERM) {
- stopped = 1;
- }
-}
-
-void transmit_flow_files(nifi_instance * instance) {
- NULL_CHECK( ,flowfiles);
- int i;
- for (i = 0; i < flowfiles->len; ++i) {
- NULL_CHECK( ,flowfiles->records[i]);
- transmit_flowfile(flowfiles->records[i], instance);
- }
-}
-
-void free_flow_file_records() {
- NULL_CHECK( ,flowfiles);
- int i;
- for (i = 0; i < flowfiles->len; ++i) {
- free_flowfile(flowfiles->records[i]);
- }
- free(flowfiles);
- flowfiles = NULL;
-}
-
-void set_offset(int offset) {
- file_offset = offset;
-}
-
-int get_offset() {
- return file_offset;
-}
-
-void free_all_strings(char ** strings, int num_strings) {
- int i;
- for (i = 0; i < num_strings; ++i) {
- free(strings[i]);
- }
-}
-
-void on_trigger_callback(processor_session * ps, processor_context * ctx) {
-
- char file_path[4096];
- char delimiter[2];
-
- if (get_property(ctx, "file_path", file_path, 50) != 0) {
- return;
- }
-
- if (get_property(ctx, "delimiter", delimiter, 2) != 0) {
- return;
- }
-
- if (strlen(delimiter) == 0) {
- printf("Delimiter not specified or it is empty\n");
- return;
- }
- char delim = '\0';
- if (strlen(delimiter) > 0) {
- delim = delimiter[0];
- }
-
- if (delim == '\0') {
- printf("Invalid delimiter \n");
- return;
- }
-
- if (delim == '\\') {
- if (strlen(delimiter) > 1) {
- switch (delimiter[1]) {
- case 'r':
- delim = '\r';
- break;
- case 't':
- delim = '\t';
- break;
- case 'n':
- delim = '\n';
- break;
- case '\\':
- delim = '\\';
- break;
- default:
- break;
- }
- }
- }
-
- int curr_offset = get_offset();
- int max_bytes_read = 4096;
- char buff[max_bytes_read + 1];
- memset(buff,'\0', max_bytes_read);
- FILE * fp = fopen(file_path, "rb");
- if (!fp) return;
- fseek(fp, curr_offset, SEEK_SET);
-
- int bytes_read = 0;
- while ((bytes_read = fread(buff, 1, max_bytes_read, fp)) > 0) {
- buff[bytes_read] = '\0';
- tokenizer_mode_t mode = TAILFILE_MODE;
- struct tokens tks = tokenize_string(buff, delim, mode);
-
- if (tks.num_strings == 0) return;
-
- set_offset(get_offset() + tks.total_bytes);
-
- flowfiles = (flow_file_records *)malloc(sizeof(flow_file_records));
- flowfiles->records = malloc(sizeof(flow_file_record *) * tks.num_strings);
- flowfiles->len = tks.num_strings;
-
- int i;
- for (i = 0; i < tks.num_strings; ++i) {
- flowfiles->records[i] = NULL;
- }
-
- for (i = 0; i < tks.num_strings; ++i) {
- if (tks.str_list[i] && strlen(tks.str_list[i]) > 0) {
- flow_file_record * ffr = generate_flow_file(instance, proc);
- const char * flow_file_path = ffr->contentLocation;
- FILE * ffp = fopen(flow_file_path, "wb");
- if (!ffp) {
- printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
- fclose(fp);
- free_tokens(&tks);
- return;
- }
- int count = strlen(tks.str_list[i]);
- int ret = fwrite(tks.str_list[i], 1, count, ffp);
- if (ret < count) {
- fclose(ffp);
- return;
- }
- fseek(ffp, 0, SEEK_END);
- ffr->size = ftell(ffp);
- fclose(ffp);
- flowfiles->records[i] = ffr;
- }
- }
- free_tokens(&tks);
- }
- fclose(fp);
-}
-
-int main(int argc, char** argv) {
-
- if (argc < 6) {
- printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
- exit(1);
- }
-
- char * file = argv[1];
- char * interval = argv[2];
- char * delimiter = argv[3];
- char * instance_str = argv[4];
- char * port_str = argv[5];
-
- if (access(file, F_OK) == -1) {
- printf("Error: %s doesn't exist!\n", file);
- exit(1);
- }
-
- struct stat stats;
- int ret = stat(file, &stats);
-
- errno = 0;
- if (ret == -1) {
- printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
- exit(1);
- }
- // Check for file existence
- if (S_ISDIR(stats.st_mode)){
- printf("Error: %s is a directory!\n", file);
- exit(1);
- }
-
- errno = 0;
- unsigned long intrvl = strtol(interval, NULL, 10);
-
- if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
- printf("Invalid interval value specified\n");
- return 0;
- }
-
- struct sigaction action;
- memset(&action, 0, sizeof(sigaction));
- action.sa_handler = signal_handler;
- sigaction(SIGTERM, &action, NULL);
- sigaction(SIGINT, &action, NULL);
-
- nifi_port port;
-
- port.port_id = port_str;
-
- instance = create_instance(instance_str, &port);
-
- const char * processor_name = "TailFile";
-
- add_custom_processor(processor_name, on_trigger_callback);
-
- proc = create_processor(processor_name);
-
- set_standalone_property(proc, "file_path", file);
- set_standalone_property(proc, "delimiter", delimiter);
-
- set_offset(0);
- while (!stopped) {
- flow_file_record * new_ff = invoke(proc);
- transmit_flow_files(instance);
- free_flow_file_records();
- free_flowfile(new_ff);
- sleep(intrvl);
- }
-
- free_standalone_processor(proc);
- free(instance);
-
- return 0;
-}
diff --git a/nanofi/include/api/ecu.h b/nanofi/include/api/ecu.h
new file mode 100644
index 0000000..9549491
--- /dev/null
+++ b/nanofi/include/api/ecu.h
@@ -0,0 +1,95 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifndef NANOFI_INCLUDE_API_ECU_H_
+#define NANOFI_INCLUDE_API_ECU_H_
+
+#include <signal.h>
+#include "api/nanofi.h"
+#include "uthash.h"
+#include "utlist.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct proc_properties {
+ char * file_path;
+ char delimiter;
+ uint64_t chunk_size;
+} proc_properties;
+
+typedef struct processor_params {
+ char uuid_str[37]; //key
+ struct flow_file_list * ff_list;
+ uint64_t curr_offset;
+ struct proc_properties * properties;
+ UT_hash_handle hh;
+} processor_params;
+
+extern processor_params * procparams;
+extern volatile sig_atomic_t stopped;
+
+typedef struct tailfile_input_params {
+ char * file;
+ char * interval;
+ char * delimiter;
+ char * instance;
+ char * tcp_port;
+ char * nifi_port_uuid;
+ char * chunk_size;
+} tailfile_input_params;
+
+typedef struct nifi_proc_params {
+ nifi_instance * instance;
+ standalone_processor * processor;
+} nifi_proc_params;
+
+/**
+ * Tails a delimited file starting from an offset up to the end of file
+ * @param file the path to the file to tail
+ * @param delim the delimiter character
+ * @param ctx the process context
+ * For eg. To tail from beginning of the file curr_offset = 0
+ * @return a list of flow file info containing list of flow file records
+ * and the current offset in the file
+ */
+flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx);
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx);
+void on_trigger_logaggregator(processor_session * ps, processor_context * ctx);
+void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx);
+void signal_handler(int signum);
+void delete_all_flow_files_from_proc(const char * uuid);
+void delete_completed_flow_files_from_proc(const char * uuid);
+void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ff);
+processor_params * get_proc_params(const char * uuid);
+
+void init_common_input(tailfile_input_params * input_params, char ** args);
+tailfile_input_params init_logaggregate_input(char ** args);
+tailfile_input_params init_tailfile_chunk_input(char ** args);
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num);
+void setup_signal_action();
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *));
+void free_proc_params(const char * uuid);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* NANOFI_INCLUDE_API_ECU_H_ */
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 074ae70..f401073 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -150,7 +150,7 @@
* @param name the name of the processor to instanciate
* @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?)
**/
-standalone_processor *create_processor(const char * name);
+standalone_processor *create_processor(const char * name, nifi_instance * instance);
/**
* Free a standalone processor
@@ -318,6 +318,13 @@
flow_file_record* generate_flow_file(nifi_instance * instance, standalone_processor * proc);
/**
+ * Adds content to the flow file record.
+ * @param ctx the processor context
+ * @return a flow file record
+ */
+flow_file_record * generate_flow(processor_context * ctx);
+
+/**
* Get incoming flow file. To be used in processor logic callbacks.
* @param session current processor session
* @param context current processor context
@@ -441,6 +448,35 @@
**/
int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship);
+/**
+ * Write content to a flow file and return a pointer to flow file record
+ * @param buff, the buffer to read content from
+ * @param count the number of bytes to read
+ * @param ctx the processor context
+ */
+flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx);
+
+/**
+ * Initialize content repository
+ * @param ctx the processor context
+ */
+void initialize_content_repo(processor_context * ctx, const char * uuid);
+
+/**
+ * Clear content repository contents
+ */
+void clear_content_repo(const nifi_instance * instance);
+
+/**
+ * Get the processor uuid from processor context
+ */
+void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target);
+
+/**
+ * Get the processor uuid from processor
+ */
+void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target);
+
/****
* ##################################################################
* Persistence Operations
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index 3be3ce3..846be5b 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -117,9 +117,9 @@
char * contentLocation; /**< Filesystem location of this object */
- void *attributes; /**< Hash map of attributes */
+ void * attributes; /**< Hash map of attributes */
- void *ffp;
+ void * ffp;
uint8_t keepContent;
@@ -175,16 +175,15 @@
* ##################################################################
*/
-typedef struct flow_file_list_node {
- flow_file_record * ff_record;
- struct flow_file_list_node * next;
-} flow_file_list_node;
-
typedef struct flow_file_list {
- flow_file_list_node * head;
- flow_file_list_node * tail;
- int len;
- int offset;
+ flow_file_record * ff_record;
+ int complete;
+ struct flow_file_list * next;
} flow_file_list;
+typedef struct flow_file_info {
+ struct flow_file_list * ff_list;
+ uint64_t total_bytes;
+} flow_file_info;
+
#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/file_utils.h b/nanofi/include/core/file_utils.h
index 25c3029..15806e4 100644
--- a/nanofi/include/core/file_utils.h
+++ b/nanofi/include/core/file_utils.h
@@ -19,6 +19,7 @@
#ifndef NANOFI_INCLUDE_CORE_FILE_UTILS_H_
#define NANOFI_INCLUDE_CORE_FILE_UTILS_H_
+#include "utlist.h"
#include "flowfiles.h"
#ifdef __cplusplus
@@ -26,14 +27,49 @@
#endif
/**
- * Tails a delimited file starting from an offset up to the end of file
- * @param file the path to the file to tail
- * @param delim the delimiter character
- * @param curr_offset the offset in the file to tail from.
- * For eg. To tail from beginning of the file curr_offset = 0
- * @return a list of tokens
+ * Recursively deletes a directory tree
+ * @param path, the path to the directory
*/
-token_list tail_file(const char * file, char delim, int curr_offset);
+void remove_directory(const char * path);
+
+/**
+ * Determine if the provided directory/file path is a directory
+ * @path the absolute path to the file/directory
+ * @return 1 if path is directory else 0
+ */
+int is_directory(const char * path);
+
+/*
+ * Get the platform-specific path separator.
+ * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+ * @return the path separator character
+ */
+const char * get_separator(int force_posix);
+
+/**
+ * Joins parent path with child path
+ * @param parent the parent path
+ * @param child the child path
+ * @return concatenated path
+ * @attention this function allocates memory for the returned concatenated path
+ * and it is left for the caller to free the memory
+ */
+char * concat_path(const char * parent, const char * child);
+
+/**
+ * Make a directory tree specified by path
+ * @param path the path to the directory
+ * @return 1 if successful else 0
+ */
+int make_dir(const char * path);
+
+/**
+ * Return the current working directory
+ * @return the current working directory
+ * @attention this function allocates memory on heap
+ * it is left to the caller to free it
+ */
+char * get_current_working_directory();
#ifdef __cplusplus
}
diff --git a/nanofi/include/core/flowfiles.h b/nanofi/include/core/flowfiles.h
index 60ac02e..72307ec 100644
--- a/nanofi/include/core/flowfiles.h
+++ b/nanofi/include/core/flowfiles.h
@@ -19,10 +19,33 @@
#ifndef NANOFI_INCLUDE_CORE_FLOWFILES_H_
#define NANOFI_INCLUDE_CORE_FLOWFILES_H_
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "cstructs.h"
+#include "api/ecu.h"
+#include "sitetosite/CPeer.h"
+#include "sitetosite/CRawSocketProtocol.h"
-void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record);
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record);
-void free_flow_file_list(flow_file_list * ff_list);
+void free_flow_file_list(flow_file_list ** ff_list);
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset);
+
+void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset);
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete);
+
+void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete);
+
+uint64_t flow_files_size(flow_file_list * ff_list);
+
+void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client);
+
+#ifdef __cplusplus
+}
+#endif
#endif /* NANOFI_INCLUDE_CORE_FLOWFILES_H_ */
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index b10b95f..0325081 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -27,6 +27,7 @@
#include "RemoteProcessorGroupPort.h"
#include "core/ContentRepository.h"
#include "core/repository/VolatileContentRepository.h"
+#include "core/repository/FileSystemRepository.h"
#include "core/Repository.h"
#include "C2CallbackAgent.h"
@@ -40,6 +41,8 @@
#include "ReflexiveSession.h"
#include "utils/ThreadPool.h"
#include "core/state/UpdateController.h"
+#include "core/file_utils.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -63,14 +66,24 @@
class Instance {
public:
- explicit Instance(const std::string &url, const std::string &port)
+ explicit Instance(const std::string &url, const std::string &port, const std::string &repo_class_name = "")
: configure_(std::make_shared<Configure>()),
url_(url),
agent_(nullptr),
rpgInitialized_(false),
listener_thread_pool_(1),
- content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+
+ if (repo_class_name == "filesystemrepository") {
+ content_repo_ = std::make_shared<minifi::core::repository::FileSystemRepository>();
+ } else {
+ content_repo_ = std::make_shared<minifi::core::repository::VolatileContentRepository>();
+ }
+ char * cwd = get_current_working_directory();
+ if (cwd) {
+ configure_->setHome(std::string(cwd));
+ free(cwd);
+ }
running_ = false;
stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
utils::Identifier uuid;
@@ -118,7 +131,7 @@
return no_op_repo_;
}
- std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+ std::shared_ptr<minifi::core::ContentRepository> getContentRepository() const {
return content_repo_;
}
diff --git a/nanofi/src/api/ecu.c b/nanofi/src/api/ecu.c
new file mode 100644
index 0000000..709c681
--- /dev/null
+++ b/nanofi/src/api/ecu.c
@@ -0,0 +1,530 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(const char * uuid) {
+
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (pp) {
+ free_flow_file_list(&pp->ff_list);
+ free(pp->properties->file_path);
+ free(pp->properties);
+ HASH_DEL(procparams, pp);
+ free(pp);
+ }
+}
+
+void signal_handler(int signum) {
+ if (signum == SIGINT || signum == SIGTERM) {
+ stopped = 1;
+ }
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+ if (args && *args) {
+ input_params->file = args[1];
+ input_params->interval = args[2];
+ input_params->instance = args[4];
+ input_params->tcp_port = args[5];
+ input_params->nifi_port_uuid = args[6];
+ }
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+ tailfile_input_params input_params;
+ memset(&input_params, 0, sizeof(input_params));
+ init_common_input(&input_params, args);
+ input_params.delimiter = args[3];
+ return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+ tailfile_input_params input_params;
+ memset(&input_params, 0, sizeof(input_params));
+ init_common_input(&input_params, args);
+ input_params.chunk_size = args[3];
+ return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num) {
+ if (access(params->file, F_OK) == -1) {
+ printf("Error: %s doesn't exist!\n", params->file);
+ return -1;
+ }
+
+ struct stat stats;
+ int ret = stat(params->file, &stats);
+
+ if (ret == -1) {
+ printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file, strerror(errno));
+ return -1;
+ }
+ // Check for file existence
+ if (S_ISDIR(stats.st_mode)){
+ printf("Error: %s is a directory!\n", params->file);
+ return -1;
+ }
+
+ errno = 0;
+ *intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+ if (errno != 0) {
+ printf("Invalid interval value specified\n");
+ return -1;
+ }
+
+ errno = 0;
+ *port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+ if (errno != 0) {
+ printf("Cannot convert tcp port to numeric value\n");
+ return -1;
+ }
+ return 0;
+}
+
+void setup_signal_action() {
+ struct sigaction action;
+ memset(&action, 0, sizeof(sigaction));
+ action.sa_handler = signal_handler;
+ sigaction(SIGTERM, &action, NULL);
+ sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *)) {
+ nifi_proc_params params;
+ nifi_port port;
+ port.port_id = input_params->nifi_port_uuid;
+
+ nifi_instance * instance = create_instance(input_params->instance, &port);
+ add_custom_processor(processor_name, callback);
+ standalone_processor * proc = create_processor(processor_name, instance);
+ params.instance = instance;
+ params.processor = proc;
+ return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * uuid) {
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (pp == NULL) {
+ pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+ memset(pp, 0, sizeof(struct processor_params));
+ strcpy(pp->uuid_str, uuid);
+ HASH_ADD_STR(procparams, uuid_str, pp);
+ }
+
+ add_flow_file_record(&pp->ff_list, ffr);
+ pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (pp) {
+ struct flow_file_list * head = pp->ff_list;
+ while (head) {
+ struct flow_file_list * tmp = head;
+ free_flowfile(tmp->ff_record);
+ head = head->next;
+ free(tmp);
+ }
+ pp->ff_list = head;
+ }
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (pp) {
+ struct flow_file_list * head = pp->ff_list;
+ while (head) {
+ struct flow_file_list * tmp = head;
+ if (tmp->complete) {
+ free_flowfile(tmp->ff_record);
+ head = head->next;
+ free(tmp);
+ }
+ else {
+ break;
+ }
+ }
+ pp->ff_list = head;
+ }
+}
+
+uint64_t get_current_offset(const char * uuid) {
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (pp) {
+ return pp->curr_offset;
+ }
+ return 0;
+}
+
+processor_params * get_proc_params(const char * uuid) {
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ return pp;
+}
+
+void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ffl) {
+ struct processor_params * pp = get_proc_params(uuid);
+ if (!pp) {
+ pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+ memset(pp, 0, sizeof(struct processor_params));
+ pp->ff_list = ffl;
+ pp->curr_offset = value;
+ strcpy(pp->uuid_str, uuid);
+ HASH_ADD_STR(procparams, uuid_str, pp);
+ return;
+ }
+ delete_all_flow_files_from_proc(uuid);
+ pp->curr_offset += value;
+ pp->ff_list = ffl;
+}
+
+uint64_t update_curr_offset(const char * uuid, uint64_t value) {
+ struct processor_params * pp = get_proc_params(uuid);
+ if (pp) {
+ pp->curr_offset += value;
+ return pp->curr_offset;
+ }
+
+ pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+ memset(pp, 0, sizeof(struct processor_params));
+ strcpy(pp->uuid_str, uuid);
+ pp->curr_offset = value;
+ HASH_ADD_STR(procparams, uuid_str, pp);
+ return pp->curr_offset;
+}
+
+struct proc_properties * get_processor_properties(const char * uuid) {
+ if (!uuid) {
+ return NULL;
+ }
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (!pp) {
+ return NULL;
+ }
+ return pp->properties;
+}
+
+void add_processor_properties(const char * uuid, struct proc_properties * const props) {
+ struct processor_params * pp = get_proc_params(uuid);
+ if (pp) {
+ pp->properties = props;
+ return;
+ }
+
+ pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+ memset(pp, 0, sizeof(struct processor_params));
+ strcpy(pp->uuid_str, uuid);
+ pp->properties = props;
+ HASH_ADD_STR(procparams, uuid_str, pp);
+}
+
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx) {
+
+ char uuid_str[37];
+ get_proc_uuid_from_context(ctx, uuid_str);
+
+ initialize_content_repo(ctx, uuid_str);
+
+ struct proc_properties * props = get_processor_properties(uuid_str);
+ if (!props) {
+ char file_path[4096];
+ char chunk_size[50];
+ if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+ return;
+ }
+
+ if (get_property(ctx, "chunk_size", chunk_size, sizeof(chunk_size)) != 0) {
+ return;
+ }
+
+ errno = 0;
+ uint64_t chunk_size_value = strtoul(chunk_size, NULL, 10);
+
+ if (errno != 0 || chunk_size_value == 0) {
+ printf("Invalid chunk size specified\n");
+ return;
+ }
+
+ props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
+ memset(props, 0, sizeof(struct proc_properties));
+ int len = strlen(file_path);
+ props->file_path = (char *)malloc((len + 1) * sizeof(char));
+ strncpy(props->file_path, file_path, len);
+ props->file_path[len] = '\0';
+ props->chunk_size = chunk_size_value;
+ add_processor_properties(uuid_str, props);
+ }
+
+ FILE * fp = fopen(props->file_path, "rb");
+
+ if (!fp) {
+ printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
+ return;
+ }
+
+ char * buff = (char *)malloc((props->chunk_size +1 ) * sizeof(char));
+ size_t bytes_read = 0;
+
+ uint64_t curr_offset = get_current_offset(uuid_str);
+ fseek(fp, curr_offset, SEEK_SET);
+ while ((bytes_read = fread(buff, 1, props->chunk_size, fp)) > 0) {
+ if (bytes_read < props->chunk_size) {
+ break;
+ }
+ buff[props->chunk_size] = '\0';
+ flow_file_record * ffr = write_to_flow(buff, strlen(buff), ctx);
+ curr_offset = ftell(fp);
+ add_attributes(ffr, props->file_path, curr_offset);
+ add_to_hash_table(ffr, curr_offset, uuid_str);
+ }
+ free(buff);
+ fclose(fp);
+}
+
+flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx) {
+ flow_file_info ff_info;
+ memset(&ff_info, 0, sizeof(ff_info));
+
+ if (!file_path) {
+ return ff_info;
+ }
+
+ char uuid_str[37];
+ get_proc_uuid_from_context(ctx, uuid_str);
+
+ char buff[MAX_BYTES_READ + 1];
+ errno = 0;
+ FILE * fp = fopen(file_path, "rb");
+ if (!fp) {
+ printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
+ return ff_info;
+ }
+
+ uint64_t curr_offset = get_current_offset(uuid_str);
+
+ fseek(fp, curr_offset, SEEK_SET);
+
+ flow_file_list * ffl = NULL;
+ size_t bytes_read = 0;
+ while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+ buff[bytes_read] = '\0';
+ struct token_list tokens = tokenize_string_tailfile(buff, delim);
+ if (tokens.total_bytes > 0) {
+ ff_info.total_bytes += tokens.total_bytes;
+ curr_offset += tokens.total_bytes;
+ fseek(fp, curr_offset, SEEK_SET);
+ }
+
+ token_node * head;
+ for (head = tokens.head; head && head->data; head = head->next) {
+ flow_file_record * ffr = write_to_flow(head->data, strlen(head->data), ctx);
+ add_attributes(ffr, file_path, curr_offset);
+ add_flow_file_record(&ffl, ffr);
+ }
+ free_all_tokens(&tokens);
+ }
+ fclose(fp);
+ ff_info.ff_list = ffl;
+ return ff_info;
+}
+
+struct proc_properties * get_properties(const char * uuid, processor_context * ctx) {
+ struct proc_properties * props = get_processor_properties(uuid);
+ if (props) {
+ return props;
+ }
+
+ char file_path[4096];
+ char delimiter[3];
+
+ if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+ return props;
+ }
+
+ if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
+ printf("No delimiter found\n");
+ return props;
+ }
+
+ if (strlen(delimiter) == 0) {
+ printf("Delimiter not specified or it is empty\n");
+ return props;
+ }
+
+ props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
+ memset(props, 0, sizeof(struct proc_properties));
+
+ char delim = delimiter[0];
+
+ if (delim == '\\') {
+ if (strlen(delimiter) > 1) {
+ switch (delimiter[1]) {
+ case 'r':
+ delim = '\r';
+ break;
+ case 't':
+ delim = '\t';
+ break;
+ case 'n':
+ delim = '\n';
+ break;
+ case '\\':
+ delim = '\\';
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ int len = strlen(file_path);
+ props->file_path = (char *)malloc((len + 1) * sizeof(char));
+ strncpy(props->file_path, file_path, len);
+ props->file_path[len] = '\0';
+ props->delimiter = delim;
+
+ add_processor_properties(uuid, props);
+ return props;
+}
+
+void on_trigger_logaggregator(processor_session * ps, processor_context * ctx) {
+ char uuid_str[37];
+ get_proc_uuid_from_context(ctx, uuid_str);
+
+ struct proc_properties * props = get_properties(uuid_str, ctx);
+
+ if (!props || !props->file_path) return;
+
+ char delim = props->delimiter;
+
+ initialize_content_repo(ctx, uuid_str);
+ flow_file_info ff_info = log_aggregate(props->file_path, delim, ctx);
+
+ update_proc_params(uuid_str, ff_info.total_bytes, ff_info.ff_list);
+}
+
+void write_flow_file(flow_file_record * ffr, const char * buff, size_t count) {
+ FILE * ffp = fopen(ffr->contentLocation, "ab");
+ if (!ffp) return;
+ if (fwrite(buff, 1, count, ffp) < count) {
+ fclose(ffp);
+ free_flowfile(ffr);
+ return;
+ }
+ fclose(ffp);
+}
+
+flow_file_list * get_last_flow_file(const char * uuid) {
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (!pp) {
+ return NULL;
+ }
+
+ flow_file_list * ff_list = pp->ff_list;
+ flow_file_list * el = NULL;
+ LL_FOREACH(ff_list, el) {
+ if (el && !el->next) {
+ return el;
+ }
+ }
+ return NULL;
+}
+
+flow_file_list * add_flow_file_to_proc_params(const char * uuid, flow_file_record * ffr) {
+ struct processor_params * pp = NULL;
+ HASH_FIND_STR(procparams, uuid, pp);
+ if (!pp) {
+ pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+ memset(pp, 0, sizeof(struct processor_params));
+ strcpy(pp->uuid_str, uuid);
+ HASH_ADD_STR(procparams, uuid_str, pp);
+ }
+ flow_file_list * ffl_node = add_flow_file_record(&pp->ff_list, ffr);
+ ffl_node->complete = 0;
+ return ffl_node;
+}
+
+void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx) {
+ char uuid_str[37];
+ get_proc_uuid_from_context(ctx, uuid_str);
+
+ initialize_content_repo(ctx, uuid_str);
+ struct proc_properties * props = get_properties(uuid_str, ctx);
+
+ if (!props || !props->file_path) return;
+
+ char delim = props->delimiter;
+
+ FILE * fp = fopen(props->file_path, "rb");
+
+ if (!fp) {
+ printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
+ return;
+ }
+
+ char buff[MAX_BYTES_READ + 1];
+ size_t bytes_read = 0;
+
+ uint64_t curr_offset = get_current_offset(uuid_str);
+ fseek(fp, curr_offset, SEEK_SET);
+
+ flow_file_list * ffl_node = get_last_flow_file(uuid_str);
+ while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+ buff[bytes_read] = '\0';
+ const char * begin = buff;
+ const char * end = NULL;
+
+ while ((end = strchr(begin, delim))) {
+ uint64_t len = end - begin;
+ if (len > 0) {
+ if (!ffl_node || ffl_node->complete) {
+ ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
+ }
+ write_flow_file(ffl_node->ff_record, begin, len);
+ update_curr_offset(uuid_str, (len + 1));
+ }
+ else {
+ update_curr_offset(uuid_str, 1);
+ }
+ if (ffl_node) {
+ ffl_node->complete = 1;
+ update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
+ }
+ begin = (end + 1);
+ }
+
+ if (!end && *begin != '\0') {
+ if (!ffl_node || ffl_node->complete) {
+ ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
+ }
+ size_t count = strlen(begin);
+ write_flow_file(ffl_node->ff_record, begin, count);
+ update_curr_offset(uuid_str, count);
+ update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
+ }
+ }
+ fclose(fp);
+}
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index d37fc6b..e8ea25a 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -113,7 +113,7 @@
* This API will gradually move away from C++, hence malloc is used for nifi_instance
* Since minifi::Instance is currently being used, then we need to use new in that case.
*/
- instance->instance_ptr = new minifi::Instance(url, port->port_id);
+ instance->instance_ptr = new minifi::Instance(url, port->port_id, "filesystemrepository");
NULL_CHECK(nullptr, instance->instance_ptr);
@@ -124,25 +124,53 @@
return instance;
}
-standalone_processor *create_processor(const char *name) {
+standalone_processor * create_processor(const char *name, nifi_instance * instance) {
NULL_CHECK(nullptr, name);
auto ptr = ExecutionPlan::createProcessor(name, name);
if (!ptr) {
return nullptr;
}
- if (standalone_instance == nullptr) {
+ if (instance == NULL) {
nifi_port port;
char portnum[] = "98765";
port.port_id = portnum;
- standalone_instance = create_instance("internal_standalone", &port);
+ instance = create_instance("internal_standalone", &port);
}
- auto flow = create_new_flow(standalone_instance);
+ auto flow = create_new_flow(instance);
std::shared_ptr<ExecutionPlan> plan(flow);
plan->addProcessor(ptr, name);
ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
return static_cast<standalone_processor*>(ptr.get());
}
+void initialize_content_repo(processor_context * ctx, const char * uuid) {
+ if (ctx->isInitialized()) {
+ return;
+ }
+ char * cwd = get_current_working_directory();
+ if (cwd) {
+ const char * sep = get_separator(0);
+ const std::string repo_path = std::string(cwd) + sep + "contentrepository" + sep + uuid;
+ ctx->initializeContentRepository(repo_path);
+ free(cwd);
+ }
+}
+
+void clear_content_repo(const nifi_instance * instance) {
+ const auto content_repo = static_cast<minifi::Instance*>(instance->instance_ptr)->getContentRepository();
+ const auto storage_path = content_repo->getStoragePath();
+ remove_directory(storage_path.c_str());
+}
+
+void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target) {
+ strcpy(uuid_target, proc->getUUIDStr().c_str());
+}
+
+void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target) {
+ standalone_processor * proc = static_cast<standalone_processor*>(ctx->getProcessorNode()->getProcessor().get());
+ get_proc_uuid_from_processor(proc, uuid_target);
+}
+
void free_standalone_processor(standalone_processor* proc) {
NULL_CHECK(, proc);
ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
@@ -245,28 +273,58 @@
return new_ff;
}
-flow_file_record * generate_flow_file(nifi_instance * instance, standalone_processor * proc) {
- if (!instance || !proc) {
- return nullptr;
- }
+flow_file_record * generate_flow(processor_context * ctx) {
flow_file_record * ffr = create_ff_object_nc();
- auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
- auto content_repo = minifi_instance_ref->getContentRepository();
+ if (ffr->crp) {
+ delete static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+ }
+ ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(ctx->getContentRepository()));
- ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(content_repo));
- auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+ auto plan = ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUIDStr());
+
if (!plan) {
return nullptr;
}
- ffr->ffp = static_cast<void*>(new std::shared_ptr<core::FlowFile>(plan->getCurrentFlowFile()));
- ffr->keepContent = 1;
+ ffr->ffp = NULL;
+ ffr->keepContent = 0;
auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp));
auto claim = std::make_shared<minifi::ResourceClaim>(*ff_content_repo_ptr);
- const char * full_path = claim->getContentFullPath().c_str();
- int len = strlen(full_path);
- ffr->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
- snprintf(ffr->contentLocation, len + 1, "%s", full_path);
+
+ size_t len = strlen(claim->getContentFullPath().c_str());
+ ffr->contentLocation = (char *) malloc((len + 1) * sizeof(char));
+ snprintf(ffr->contentLocation, len+1, "%s", claim->getContentFullPath().c_str());
+ return ffr;
+}
+
+flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx) {
+ if (!ctx) {
+ return NULL;
+ }
+
+ flow_file_record * ffr = generate_flow(ctx);
+
+ if (ffr == NULL) {
+ printf("Could not generate flow file\n");
+ return NULL;
+ }
+
+ FILE * ffp = fopen(ffr->contentLocation, "wb");
+ if (!ffp) {
+ printf("Cannot open flow file at path %s to write content to.\n", ffr->contentLocation);
+ free_flowfile(ffr);
+ return NULL;
+ }
+
+ int ret = fwrite(buff, 1, count, ffp);
+ if (ret < count) {
+ fclose(ffp);
+ free_flowfile(ffr);
+ return NULL;
+ }
+ fseek(ffp, 0, SEEK_END);
+ ffr->size = ftell(ffp);
+ fclose(ffp);
return ffr;
}
diff --git a/nanofi/src/core/file_utils.c b/nanofi/src/core/file_utils.c
index 3f7b79e..1eeedc6 100644
--- a/nanofi/src/core/file_utils.c
+++ b/nanofi/src/core/file_utils.c
@@ -20,44 +20,122 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <unistd.h>
+#include <limits.h>
-#include "api/nanofi.h"
#include "core/string_utils.h"
#include "core/file_utils.h"
-token_list tail_file(const char * file_path, char delim, int curr_offset) {
- token_list tkn_list;
- memset(&tkn_list, 0, sizeof(struct token_list));
+#ifdef _MSC_VER
+#ifndef PATH_MAX
+#define PATH_MAX 260
+#endif
+#endif
- if (!file_path) {
- return tkn_list;
+int is_directory(const char * path) {
+ struct stat dir_stat;
+ if (stat(path, &dir_stat) < 0) {
+ return 0;
+ }
+ return S_ISDIR(dir_stat.st_mode);
+}
+
+const char * get_separator(int force_posix)
+{
+#ifdef WIN32
+ if (!force_posix) {
+ return "\\";
+ }
+#endif
+ return "/";
+}
+
+char * concat_path(const char * parent, const char * child) {
+ char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * sizeof(char));
+ strcpy(path, parent);
+ const char * sep = get_separator(0);
+ strcat(path, sep);
+ strcat(path, child);
+ return path;
+}
+
+void remove_directory(const char * dir_path) {
+
+ if (!is_directory(dir_path)) {
+ if (unlink(dir_path) == -1) {
+ printf("Could not remove file %s\n", dir_path);
+ }
+ return;
}
- char buff[MAX_BYTES_READ + 1];
- memset(buff, 0, MAX_BYTES_READ+1);
+ uint64_t path_len = strlen(dir_path);
+ struct dirent * dir;
+ DIR * d = opendir(dir_path);
+
+ while ((dir = readdir(d)) != NULL) {
+ char * entry_name = dir->d_name;
+ if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) {
+ continue;
+ }
+ char * path = concat_path(dir_path, entry_name);
+ remove_directory(path);
+ free(path);
+ }
+
+ rmdir(dir_path);
+ closedir(d);
+}
+
+int make_dir(const char * path) {
+ if (!path) return -1;
+
errno = 0;
- FILE * fp = fopen(file_path, "rb");
- if (!fp) {
- printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
- return tkn_list;
+ int ret = mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
+ if (ret == 0) {
+ return 0;
}
- fseek(fp, curr_offset, SEEK_SET);
- int bytes_read = 0;
- int i = 0;
- while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
- buff[bytes_read] = '\0';
- struct token_list tokens = tokenize_string_tailfile(buff, delim);
- if (tokens.size > 0) {
- attach_lists(&tkn_list, &tokens);
+ switch (errno) {
+ case ENOENT: {
+ char * found = strrchr(path, '/');
+ if (!found) {
+ return -1;
}
- tkn_list.total_bytes += tokens.total_bytes;
- if (tokens.total_bytes > 0) {
- curr_offset += tokens.total_bytes;
- fseek(fp, curr_offset, SEEK_SET);
+ int len = found - path;
+ char * dir = calloc(len + 1, sizeof(char));
+ strncpy(dir, path, len);
+ dir[len] = '\0';
+ int res = make_dir(dir);
+ free(dir);
+ if (res < 0) {
+ return -1;
}
- memset(buff, 0, MAX_BYTES_READ);
+ return mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
}
- fclose(fp);
- return tkn_list;
+ case EEXIST: {
+ if (is_directory(path)) {
+ return 0;
+ }
+ return -1;
+ }
+ default:
+ return -1;
+ }
+}
+
+char * get_current_working_directory() {
+ char * cwd = (char *)malloc(PATH_MAX * sizeof(char));
+ memset(cwd, 0, PATH_MAX);
+ #ifdef WIN32
+ if (_getcwd(cwd, PATH_MAX) != NULL)
+ return cwd;
+ #else
+ if (getcwd(cwd, PATH_MAX) != NULL) {
+ return cwd;
+ }
+ #endif
+ free(cwd);
+ return NULL;
}
diff --git a/nanofi/src/core/flowfiles.c b/nanofi/src/core/flowfiles.c
index edf2c5b..6c86e98 100644
--- a/nanofi/src/core/flowfiles.c
+++ b/nanofi/src/core/flowfiles.c
@@ -16,38 +16,149 @@
* limitations under the License.
*/
+#include "api/nanofi.h"
+#include "api/ecu.h"
#include "core/flowfiles.h"
+
+#include "utlist.h"
#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/stat.h>
-void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record) {
- if (!ff_list || !record) return;
-
- struct flow_file_list_node * new_node = (struct flow_file_list_node *)malloc(sizeof(struct flow_file_list_node));
- new_node->ff_record = record;
- new_node->next = NULL;
-
- if (!ff_list->head || !ff_list->tail) {
- ff_list->head = ff_list->tail = new_node;
- ff_list->len = 1;
- return;
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record) {
+ if (!record) {
+ return *ff_list;
}
- ff_list->tail->next = new_node;
- ff_list->tail = new_node;
- ff_list->len++;
+ struct flow_file_list * new_node = (struct flow_file_list *)malloc(sizeof(struct flow_file_list));
+ new_node->ff_record = record;
+ LL_APPEND(*ff_list, new_node);
+ return new_node;
}
-void free_flow_file_list(flow_file_list * ff_list) {
- if (!ff_list || !ff_list->head) {
+void free_flow_file_list(flow_file_list ** ff_list) {
+ if (!*ff_list) {
+ return;
+ }
+ flow_file_list * head = *ff_list;
+ while (head) {
+ flow_file_list * tmp = head;
+ free_flowfile(tmp->ff_record);
+ head = head->next;
+ free(tmp);
+ }
+}
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) {
+ char offset_str[21];
+ snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+ add_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+ char content_location[strlen(ffr->contentLocation) + 1];
+ snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation);
+ add_attribute(ffr, "content location", content_location, strlen(content_location));
+ add_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) {
+ char offset_str[21];
+ snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+ update_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+ char content_location[strlen(ffr->contentLocation) + 1];
+ snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation);
+ update_attribute(ffr, "content location", content_location, strlen(content_location));
+ update_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete) {
+ if (!instance || !ff_list) {
+ return;
+ }
+ flow_file_list * el = NULL;
+ LL_FOREACH(ff_list, el) {
+ if (!complete || el->complete) {
+ transmit_flowfile(el->ff_record, instance);
+ }
+ }
+}
+
+void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client) {
+ if (!ffl || !client) {
return;
}
- flow_file_list_node * head = ff_list->head;
- while (head) {
- free_flowfile(head->ff_record);
- flow_file_list_node * tmp = head;
- head = head->next;
- free(tmp);
+ char * file = ffl->ff_record->contentLocation;
+ FILE * fp = fopen(file, "rb");
+ if (!fp) {
+ return;
}
- memset(ff_list, 0, sizeof(struct flow_file_list));
+
+ struct stat statfs;
+ if (stat(file, &statfs) < 0) {
+ return;
+ }
+ size_t file_size = statfs.st_size;
+
+ attribute attr;
+ attr.key = "current offset";
+ if (get_attribute(ffl->ff_record, &attr) < 0) {
+ printf("Error looking up flow file attribute %s\n", attr.key);
+ return;
+ }
+
+ errno = 0;
+ uint64_t offset = strtoull((const char *)attr.value, NULL, 10);
+ if (errno != 0) {
+ printf("Error converting flow file offset value\n");
+ return;
+ }
+ uint64_t begin_offset = offset - file_size;
+ char * buff = (char *)malloc(sizeof(char) * 4097);
+ size_t count = 0;
+ while ((count = fread(buff, 1, 4096, fp)) > 0) {
+ buff[count] = '\0';
+ begin_offset += count;
+ char offset_str[21];
+ snprintf(offset_str, sizeof(offset_str), "%llu", begin_offset);
+ update_attribute(ffl->ff_record, "current offset", offset_str, strlen(offset_str));
+
+ attribute_set as;
+ uint64_t num_attrs = get_attribute_quantity(ffl->ff_record);
+ as.size = num_attrs;
+ as.attributes = (attribute *)malloc(num_attrs * sizeof(attribute));
+ get_all_attributes(ffl->ff_record, &as);
+
+ if (transmitPayload(client, buff, &as) == 0) {
+ printf("payload of %zu bytes from %s sent successfully\n", count, ffl->ff_record->contentLocation);
+ }
+ else {
+ printf("Failed to send payload, flow file %s\n", ffl->ff_record->contentLocation);
+ }
+ free(as.attributes);
+ }
+ free(buff);
+ fclose(fp);
+}
+
+void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete) {
+ if (!client || !ff_list) {
+ return;
+ }
+ flow_file_list * el = NULL;
+ LL_FOREACH(ff_list, el) {
+ if (!complete || el->complete) {
+ read_payload_and_transmit(el, client);
+ }
+ }
+}
+
+uint64_t flow_files_size(flow_file_list * ff_list) {
+ if (!ff_list) {
+ return 0;
+ }
+
+ uint64_t counter = 0;
+ flow_file_list * el = NULL;
+ LL_COUNT(ff_list, el, counter);
+ return counter;
}
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index 9a63c4a..769b1dc 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -318,7 +318,7 @@
create_testfile_for_getfile(sourcedir.c_str());
- standalone_processor* getfile_proc = create_processor("GetFile");
+ standalone_processor* getfile_proc = create_processor("GetFile", NULL);
REQUIRE(set_standalone_property(getfile_proc, "Input Directory", sourcedir.c_str()) == 0);
flow_file_record* ffr = invoke(getfile_proc);
@@ -326,7 +326,7 @@
REQUIRE(ffr != nullptr);
REQUIRE(get_attribute_quantity(ffr) > 0);
- standalone_processor* extract_test = create_processor("ExtractText");
+ standalone_processor* extract_test = create_processor("ExtractText", NULL);
REQUIRE(extract_test != nullptr);
REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
@@ -379,7 +379,7 @@
flow_file_record *record = get_next_flow_file(instance, test_flow);
REQUIRE(record != nullptr);
- standalone_processor* putfile_proc = create_processor("PutFile");
+ standalone_processor* putfile_proc = create_processor("PutFile", NULL);
REQUIRE(set_standalone_property(putfile_proc, "Directory", putfiledir.c_str()) == 0);
flow_file_record* put_record = invoke_ff(putfile_proc, record);
@@ -409,7 +409,7 @@
auto sourcedir = testController.createTempDirectory(src_format);
std::string path = create_testfile_for_getfile(sourcedir.c_str());
- standalone_processor* extract_test = create_processor("ExtractText");
+ standalone_processor* extract_test = create_processor("ExtractText", NULL);
REQUIRE(extract_test != nullptr);
REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
@@ -465,9 +465,9 @@
free_standalone_processor(nullptr);
free_instance(nullptr);
- REQUIRE(create_processor(nullptr) == nullptr);
+ REQUIRE(create_processor(nullptr, nullptr) == nullptr);
- standalone_processor *standalone_proc = create_processor("GetFile");
+ standalone_processor *standalone_proc = create_processor("GetFile", NULL);
REQUIRE(standalone_proc != nullptr);
REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1);
diff --git a/nanofi/tests/CTailFileTests.cpp b/nanofi/tests/CLogAggregatorTests.cpp
similarity index 69%
rename from nanofi/tests/CTailFileTests.cpp
rename to nanofi/tests/CLogAggregatorTests.cpp
index 491e8c0..edf62c9 100644
--- a/nanofi/tests/CTailFileTests.cpp
+++ b/nanofi/tests/CLogAggregatorTests.cpp
@@ -16,20 +16,20 @@
* limitations under the License.
*/
+#ifndef _WIN32
#include "catch.hpp"
#include <vector>
#include <string>
-#include <fstream>
#include <numeric>
#include <algorithm>
-#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include "core/string_utils.h"
#include "core/file_utils.h"
+#include "CTestsBase.h"
void test_lists_equal(token_list * tknlist, const std::vector<std::string>& sv) {
REQUIRE(tknlist != NULL);
@@ -174,131 +174,101 @@
* ##################################################################
*/
-class FileManager {
-public:
- FileManager(const std::string& filePath) {
- assert(!filePath.empty() && "filePath provided cannot be empty!");
- filePath_ = filePath;
- outputStream_.open(filePath_, std::ios::binary);
- }
+TEST_CASE("Simple log aggregator test", "[testLogAggregator]") {
- ~FileManager() {
- std::ifstream ifs(filePath_);
- if (ifs.good()) {
- remove(filePath_.c_str());
- }
- }
-
- void Write(const std::string& str) {
- outputStream_ << str;
- }
-
- std::string WriteNChars(uint64_t n, char c) {
- std::string s(n, c);
- outputStream_ << s;
- return s;
- }
-
- std::string getFilePath() const {
- return filePath_;
- }
-
- void CloseStream() {
- outputStream_.flush();
- outputStream_.close();
- }
-
- uint64_t GetFileSize() {
- CloseStream();
- struct stat buff;
- if (stat(filePath_.c_str(), &buff) == 0) {
- return buff.st_size;
- }
- return 0;
- }
-
-private:
- std::string filePath_;
- std::ofstream outputStream_;
-};
-
-TEST_CASE("Simple tail file test", "[testTailFile]") {
-
+ const char * content = "hello world";
FileManager fm("test.txt");
- fm.Write("hello world");
+ fm.Write(content);
fm.CloseStream();
- const char * file = fm.getFilePath().c_str();
- struct token_list tkn_list = tail_file(file, ';', 0);
- REQUIRE(tkn_list.size == 0);
- REQUIRE(tkn_list.head == NULL);
- REQUIRE(tkn_list.total_bytes == 0);
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+ struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->curr_offset == 0);
+ REQUIRE(flow_files_size(pp->ff_list) == 0);
}
-TEST_CASE("Empty file tail test", "[testEmptyFileTail]") {
+TEST_CASE("Empty file log aggregator test", "[testEmptyFileLogAggregator]") {
FileManager fm("test.txt");
fm.CloseStream();
- const char * file = fm.getFilePath().c_str();
- struct token_list tkn_list = tail_file(file, ';', 0);
- REQUIRE(tkn_list.size == 0);
- REQUIRE(tkn_list.head == NULL);
- REQUIRE(tkn_list.total_bytes == 0);
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+ struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+ REQUIRE(pp != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 0);
+ REQUIRE(pp->ff_list == NULL);
+ REQUIRE(pp->curr_offset == 0);
}
-TEST_CASE("File containing only delimiters tail test", "[testDelimiterOnlyFileTail]") {
+TEST_CASE("File containing only delimiters test", "[testDelimiterOnlyLogAggregator]") {
FileManager fm("test.txt");
- fm.Write("----");
+ fm.Write(";;;;");
fm.CloseStream();
- const char * file = fm.getFilePath().c_str();
- struct token_list tkn_list = tail_file(file, '-', 0);
- REQUIRE(tkn_list.size == 0);
- REQUIRE(tkn_list.head == NULL);
- REQUIRE(tkn_list.total_bytes == 4);
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+ struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+ REQUIRE(pp != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 0);
+ REQUIRE(pp->ff_list == NULL);
+ REQUIRE(pp->curr_offset == 4);
}
-TEST_CASE("File tail test string starting with delimiter", "[testDelimiterOnlyFileTail]") {
+TEST_CASE("File containing string starting with delimiter", "[testDelimiterStartingStrings]") {
FileManager fm("test.txt");
- fm.Write("----hello");
+ fm.Write(";;;;hello");
fm.CloseStream();
- const char * file = fm.getFilePath().c_str();
- struct token_list tkn_list = tail_file(file, '-', 0);
- REQUIRE(tkn_list.size == 0);
- REQUIRE(tkn_list.head == NULL);
- REQUIRE(tkn_list.total_bytes == 4);
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+ auto pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+ REQUIRE(flow_files_size(pp->ff_list) == 0);
+ REQUIRE(pp->ff_list == NULL);
+ REQUIRE(pp->curr_offset == 4);
}
-TEST_CASE("Test tail file with less than 4096 delimited chars", "[testTailFileDelimitedString]") {
+TEST_CASE("Test tail file with less than 4096 delimited chars", "[testLogAggregateFileLessThan4KB]") {
+ const std::string token1("token1");
+ const std::string token2("token2");
+ const std::string token3("token3");
+ std::vector<std::string> tokens = {token1, token2, token3};
- const std::string delimitedString = "token1--token2--token3";
+ const std::string delimitedString = join_strings(tokens, ";;");
FileManager fm("test.txt");
fm.Write(delimitedString);
const std::string filePath = fm.getFilePath();
fm.CloseStream();
- struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
- test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"});
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+ auto pp = invoke_processor(mgr, filePath.c_str());
+
+ REQUIRE(pp->curr_offset == (token1.size() + token2.size() + (2 * std::string("--").size())));
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
}
// Although there is no delimiter within the string that is at least 4096 bytes long,
// tail_file still creates a flow file for the first 4096 bytes.
-TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testTailFile4096Chars]") {
+TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testLogAggregateFile4096Chars]") {
FileManager fm("test.txt");
const std::string s = std::move(fm.WriteNChars(4096, 'a'));
const std::string filePath = fm.getFilePath();
fm.CloseStream();
- struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
- test_lists_equal(&tokens, std::vector<std::string>{std::move(s)});
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+ auto pp = invoke_processor(mgr, filePath.c_str());
+ REQUIRE(pp->curr_offset == 4096);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
}
// Although there is no delimiter within the string that is equal to 4096 bytes or longer
// tail_file creates a flow file for each subsequent 4096 byte chunk. It leaves the last chunk
// if it is smaller than 4096 bytes and not delimited
-TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testTailFileMoreThan4096Chars]") {
+TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testLogAggregarteFileMoreThan4096Chars]") {
FileManager fm("test.txt");
const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
@@ -307,63 +277,89 @@
fm.Write(s3);
fm.CloseStream();
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
const uint64_t totalStringsSize = s1.size() + s2.size() + s3.size();
const std::string filePath = fm.getFilePath();
const uint64_t bytesWrittenToStream = fm.GetFileSize();
REQUIRE(bytesWrittenToStream == totalStringsSize);
- struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
- test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+ auto pp = invoke_processor(mgr, filePath.c_str());
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+ flow_file_list * el = NULL;
+ LL_FOREACH(pp->ff_list, el) {
+ REQUIRE(el->ff_record->size == 4096);
+ }
+ REQUIRE(pp->curr_offset == (s1.size() + s2.size()));
}
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testLogAggregateWithDelimitedString]") {
FileManager fm("test.txt");
const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
- const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+ const std::string d1 = std::move(fm.WriteNChars(2, ';'));
const std::string s2 = std::move(fm.WriteNChars(4096, 'b'));
fm.CloseStream();
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
const std::string filePath = fm.getFilePath();
const uint64_t bytesWrittenToStream = fm.GetFileSize();
REQUIRE(bytesWrittenToStream == totalStringsSize);
- struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
- test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+ auto pp = invoke_processor(mgr, filePath.c_str());
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+ REQUIRE(pp->curr_offset == totalStringsSize);
+ flow_file_list * el = NULL;
+ LL_FOREACH(pp->ff_list, el) {
+ REQUIRE(el->ff_record->size == 4096);
+ }
}
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testLogAggregateDelimited]") {
FileManager fm("test.txt");
const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
- const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+ const std::string d1 = std::move(fm.WriteNChars(2, ';'));
const std::string s2 = std::move(fm.WriteNChars(4000, 'b'));
fm.CloseStream();
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
const std::string filePath = fm.getFilePath();
const uint64_t bytesWrittenToStream = fm.GetFileSize();
REQUIRE(bytesWrittenToStream == totalStringsSize);
- struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
- test_lists_equal(&tokens, std::vector<std::string>{std::move(s1)});
+ auto pp = invoke_processor(mgr, filePath.c_str());
+ REQUIRE(pp->curr_offset == (s1.size() + d1.size()));
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+ flow_file_list * el = NULL;
+ LL_FOREACH(pp->ff_list, el) {
+ REQUIRE(el->ff_record->size == 4096);
+ }
}
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testLogAggregateDelimited]") {
FileManager fm("test.txt");
const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
- const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+ const std::string d1 = std::move(fm.WriteNChars(2, ';'));
const std::string s2 = std::move(fm.WriteNChars(4098, 'b'));
fm.CloseStream();
+ TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
const std::string filePath = fm.getFilePath();
const uint64_t bytesWrittenToStream = fm.GetFileSize();
REQUIRE(bytesWrittenToStream == totalStringsSize);
- const std::string s3 = std::string(s2.data(), s2.data()+4096);
- struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
- test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s3)});
+ auto pp = invoke_processor(mgr, filePath.c_str());
+ REQUIRE(pp->curr_offset == 8194);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+ flow_file_list * el = NULL;
+ LL_FOREACH(pp->ff_list, el) {
+ REQUIRE(el->ff_record->size == 4096);
+ }
}
+#endif
diff --git a/nanofi/tests/CTailFileChunkTests.cpp b/nanofi/tests/CTailFileChunkTests.cpp
new file mode 100644
index 0000000..ff933c8
--- /dev/null
+++ b/nanofi/tests/CTailFileChunkTests.cpp
@@ -0,0 +1,135 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _WIN32
+#include "catch.hpp"
+
+#include <dirent.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+#include "CTestsBase.h"
+
+/****
+ * ##################################################################
+ * CTAILFILE CHUNK TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test tailfile chunk size 4096, file size 8KB", "[tailfileChunk8KBFileSize]") {
+
+ TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+ const char * file = "./e.txt";
+ const char * chunksize = "4096";
+
+ //Write 8192 bytes to the file
+ FileManager fm(file);
+ fm.WriteNChars(4096, 'a');
+ fm.WriteNChars(4096, 'b');
+ fm.CloseStream();
+
+ standalone_processor * proc = mgr.getProcessor();
+ set_standalone_property(proc, "file_path", file);
+ set_standalone_property(proc, "chunk_size", chunksize);
+
+ flow_file_record * new_ff = invoke(proc);
+
+ char uuid_str[37];
+ get_proc_uuid_from_processor(proc, uuid_str);
+ struct processor_params * pp = get_proc_params(uuid_str);
+
+ //Test that two flow file records were created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(pp->ff_list->ff_record != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+ //Test that the current offset in the file is 8192 bytes
+ REQUIRE(pp->curr_offset == 8192);
+}
+
+TEST_CASE("Test tailfile chunk size 4096, file size less than 8KB", "[tailfileChunkFileSizeLessThan8KB]") {
+
+ TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+ const char * file = "./e.txt";
+ const char * chunksize = "4096";
+
+ //Write 4505 bytes to the file
+ FileManager fm(file);
+ fm.WriteNChars(4096, 'a');
+ fm.WriteNChars(409, 'b');
+ fm.CloseStream();
+
+ standalone_processor * proc = mgr.getProcessor();
+ set_standalone_property(proc, "file_path", file);
+ set_standalone_property(proc, "chunk_size", chunksize);
+
+ flow_file_record * new_ff = invoke(proc);
+
+ char uuid_str[37];
+ get_proc_uuid_from_processor(proc, uuid_str);
+ struct processor_params * pp = get_proc_params(uuid_str);
+ //Test that one flow file record was created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(pp->ff_list->ff_record != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+ //Test that the current offset in the file is 4096 bytes
+ REQUIRE(pp->curr_offset == 4096);
+ REQUIRE(pp->ff_list->ff_record->size == 4096);
+
+ struct stat fstat;
+ REQUIRE(stat(pp->ff_list->ff_record->contentLocation, &fstat) == 0);
+ REQUIRE(fstat.st_size == 4096);
+}
+
+TEST_CASE("Test tailfile chunk size 512, file size equal to 4608B", "[tailfileChunkFileSize8KB]") {
+
+ TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+ const char * file = "./e.txt";
+ const char * chunksize = "512";
+
+ //Write 4608 bytes to the file
+ FileManager fm(file);
+ fm.WriteNChars(4608, 'a');
+ fm.CloseStream();
+
+ standalone_processor * proc = mgr.getProcessor();
+ set_standalone_property(proc, "file_path", file);
+ set_standalone_property(proc, "chunk_size", chunksize);
+
+ flow_file_record * new_ff = invoke(proc);
+
+ char uuid_str[37];
+ get_proc_uuid_from_processor(proc, uuid_str);
+ struct processor_params * pp = get_proc_params(uuid_str);
+
+ //Test that one flow file record was created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(pp->ff_list->ff_record != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 9);
+
+ //Test that the current offset in the file is 4608 bytes
+ REQUIRE(pp->curr_offset == 4608);
+}
+#endif
diff --git a/nanofi/tests/CTailFileDelimitedTests.cpp b/nanofi/tests/CTailFileDelimitedTests.cpp
new file mode 100644
index 0000000..ed079d1
--- /dev/null
+++ b/nanofi/tests/CTailFileDelimitedTests.cpp
@@ -0,0 +1,256 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "catch.hpp"
+
+#include "CTestsBase.h"
+
+/****
+ * ##################################################################
+ * CTAILFILE DELIMITED TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test tailfile delimited. Empty file", "[tailfileDelimitedEmptyFileTest]") {
+
+ TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+ const char * file = "./e.txt";
+ const char * delimiter = ";";
+
+ //Create empty file
+ FileManager fm(file);
+
+ auto pp = invoke_processor(mgr, file);
+
+ //Test that no flowfiles were created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list == NULL);
+}
+
+TEST_CASE("Test tailfile delimited. File has less than 4096 chars", "[tailfileDelimitedLessThan4096Chars]") {
+
+ TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+ const char * file = "./e.txt";
+ const char * delimiter = ";";
+
+ FileManager fm(file);
+ fm.WriteNChars(34, 'a');
+ fm.CloseStream();
+
+ auto pp = invoke_processor(mgr, file);
+
+ //No flow files will be created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
+ REQUIRE(pp->ff_list->complete == 0);
+
+ //Test that the current offset in the file is 34
+ REQUIRE(pp->curr_offset == 34);
+}
+
+TEST_CASE("Test tailfile delimited. Simple test", "[tailfileDelimitedSimpleTest]") {
+
+ TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+ const char * file = "./e.txt";
+ const char * delimiter = ";";
+
+ //Write 8192 bytes to the file
+ FileManager fm(file);
+ fm.WriteNChars(34, 'a');
+ fm.WriteNChars(1, ';');
+ fm.WriteNChars(6, 'b');
+ fm.WriteNChars(1, ';');
+ fm.CloseStream();
+
+ auto pp = invoke_processor(mgr, file);
+
+ //Test that two flow file records were created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(pp->ff_list->ff_record != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+ //Test that the current offset in the file is 42 bytes
+ REQUIRE(pp->curr_offset == 42);
+
+ //Test the flow file sizes
+ const char * flowfile1_path = pp->ff_list->ff_record->contentLocation;
+ const char * flowfile2_path = pp->ff_list->next->ff_record->contentLocation;
+
+ struct stat fstat;
+ stat(flowfile1_path, &fstat);
+ REQUIRE(fstat.st_size == 34);
+
+ stat(flowfile2_path, &fstat);
+ REQUIRE(fstat.st_size == 6);
+
+ REQUIRE(pp->ff_list->complete == 1);
+ REQUIRE(pp->ff_list->next->complete == 1);
+}
+
+TEST_CASE("Test tailfile delimited. trailing non delimited string", "[tailfileNonDelimitedTest]") {
+
+ TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+ const char * file = "./e.txt";
+ const char * delimiter = ";";
+
+ //Write 8192 bytes to the file
+ FileManager fm(file);
+ fm.WriteNChars(34, 'a');
+ fm.WriteNChars(1, ';');
+ fm.WriteNChars(32, 'b');
+ fm.CloseStream();
+
+ auto pp = invoke_processor(mgr, file);
+
+ //Test that two flow file records were created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(pp->ff_list->ff_record != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+ //Test that the current offset in the file is 35 bytes
+ REQUIRE(pp->curr_offset == 67);
+ REQUIRE(pp->ff_list->complete == 1);
+ REQUIRE(pp->ff_list->next->complete == 0);
+ struct stat fstat;
+ stat(pp->ff_list->ff_record->contentLocation, &fstat);
+ REQUIRE(fstat.st_size == 34);
+
+ //Append a delimiter at the end of the file
+ fm.OpenStream();
+ fm.WriteNChars(1, ';');
+ fm.CloseStream();
+
+ pp = invoke_processor(mgr, file);
+ REQUIRE(pp != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+ stat(pp->ff_list->next->ff_record->contentLocation, &fstat);
+ REQUIRE(fstat.st_size == 32);
+ REQUIRE(pp->ff_list->next->complete == 1);
+}
+
+TEST_CASE("Test tailfile delimited 4096 chars non delimited", "[tailfileDelimitedSimpleTest]") {
+
+ TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+ const char * file = "./e.txt";
+ const char * delimiter = ";";
+
+ //Write 4096 bytes to the file
+ FileManager fm(file);
+ fm.WriteNChars(4096, 'a');
+ fm.CloseStream();
+
+ auto pp = invoke_processor(mgr, file);
+
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
+ REQUIRE(pp->ff_list->complete == 0);
+ //Test that the current offset in the file is 4096 bytes
+ REQUIRE(pp->curr_offset == 4096);
+
+ //Write another 2048 characters
+ fm.OpenStream();
+ fm.WriteNChars(2048, 'b');
+ fm.CloseStream();
+
+ pp = invoke_processor(mgr, file);
+
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
+ REQUIRE(pp->ff_list->complete == 0);
+
+ //Test that the current offset in the file is (4096 + 2048)
+ REQUIRE(pp->curr_offset == 6144);
+
+ //Write another 2048 characters
+ fm.OpenStream();
+ fm.WriteNChars(2048, 'c');
+ fm.CloseStream();
+
+ pp = invoke_processor(mgr, file);
+
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+ //Test that the current offset in the file is 8192 bytes only
+ REQUIRE(pp->curr_offset == 8192);
+
+ //Write a delimiter at the end and expect a flow file size of 8192 bytes
+ fm.OpenStream();
+ fm.WriteNChars(1, ';');
+ fm.CloseStream();
+
+ pp = invoke_processor(mgr, file);
+
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(pp->ff_list->ff_record != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 1);
+ REQUIRE(pp->ff_list->complete == 1);
+ const char * flowfile_path = pp->ff_list->ff_record->contentLocation;
+ struct stat fstat;
+ stat(flowfile_path, &fstat);
+ REQUIRE(fstat.st_size == 8192);
+}
+
+TEST_CASE("Test tailfile delimited. string starting with delimiter", "[tailfileDelimiterStartStringTest]") {
+
+ TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+ const char * file = "./e.txt";
+ const char * delimiter = ";";
+
+ //Write 8192 bytes to the file
+ FileManager fm(file);
+ fm.WriteNChars(5, ';');
+ fm.WriteNChars(34, 'a');
+ fm.WriteNChars(4, ';');
+ fm.WriteNChars(32, 'b');
+ fm.CloseStream();
+
+ auto pp = invoke_processor(mgr, file);
+
+ //Test that two flow file records were created
+ REQUIRE(pp != NULL);
+ REQUIRE(pp->ff_list != NULL);
+ REQUIRE(pp->ff_list->ff_record != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+ //Test that the current offset in the file is 35 bytes
+ REQUIRE(pp->curr_offset == 75);
+ REQUIRE(pp->ff_list->complete == 1);
+ REQUIRE(pp->ff_list->next->complete == 0);
+ struct stat fstat;
+ stat(pp->ff_list->ff_record->contentLocation, &fstat);
+ REQUIRE(fstat.st_size == 34);
+
+ //Append a delimiter at the end of the file
+ fm.OpenStream();
+ fm.WriteNChars(1, ';');
+ fm.CloseStream();
+
+ pp = invoke_processor(mgr, file);
+ REQUIRE(pp != NULL);
+ REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+ stat(pp->ff_list->next->ff_record->contentLocation, &fstat);
+ REQUIRE(fstat.st_size == 32);
+ REQUIRE(pp->ff_list->next->complete == 1);
+}
diff --git a/nanofi/tests/CTestsBase.h b/nanofi/tests/CTestsBase.h
new file mode 100644
index 0000000..bcde416
--- /dev/null
+++ b/nanofi/tests/CTestsBase.h
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _WIN32
+#ifndef NANOFI_TESTS_CTESTSBASE_H_
+#define NANOFI_TESTS_CTESTSBASE_H_
+
+#include <vector>
+#include <string>
+#include <fstream>
+#include <assert.h>
+#include <sys/stat.h>
+
+#include "core/file_utils.h"
+#include "api/ecu.h"
+#include "api/nanofi.h"
+
+class FileManager {
+public:
+ FileManager(const std::string& filePath) {
+ assert(!filePath.empty() && "filePath provided cannot be empty!");
+ struct stat statbuff;
+ assert(!is_directory(filePath.c_str()) && "Provided file is not a filepath");
+ filePath_ = filePath;
+ remove(filePath_.c_str());
+ outputStream_.open(filePath_, std::ios::binary);
+ }
+
+ ~FileManager() {
+ std::ifstream ifs(filePath_);
+ if (ifs.good()) {
+ remove(filePath_.c_str());
+ }
+ }
+
+ void Write(const std::string& str) {
+ outputStream_ << str;
+ }
+
+ std::string WriteNChars(uint64_t n, char c) {
+ std::string s(n, c);
+ outputStream_ << s;
+ return s;
+ }
+
+ std::string getFilePath() const {
+ return filePath_;
+ }
+
+ void OpenStream() {
+ outputStream_.open(filePath_, std::ios::binary|std::ios::app);
+ }
+
+ void CloseStream() {
+ outputStream_.flush();
+ outputStream_.close();
+ }
+
+ uint64_t GetFileSize() {
+ CloseStream();
+ struct stat buff;
+ if (stat(filePath_.c_str(), &buff) == 0) {
+ return buff.st_size;
+ }
+ return 0;
+ }
+
+private:
+ std::string filePath_;
+ std::ofstream outputStream_;
+};
+
+class TailFileTestResourceManager {
+public:
+ TailFileTestResourceManager(const std::string& processor_name, void(*callback)(processor_session * ps, processor_context * ctx)) {
+ const char * port_str = "uuid";
+ nifi_port port;
+ port.port_id = (char *)port_str;
+ const char * instance_str = "nifi";
+ instance_ = create_instance(instance_str, &port);
+ add_custom_processor(processor_name.c_str(), callback);
+ processor_ = create_processor(processor_name.c_str(), instance_);
+ }
+
+ ~TailFileTestResourceManager() {
+ remove_directory("./contentrepository");
+ char uuid_str[37];
+ get_proc_uuid_from_processor(processor_, uuid_str);
+ delete_all_flow_files_from_proc(uuid_str);
+ struct processor_params * tmp, * pp = NULL;
+ HASH_ITER(hh, procparams, pp, tmp) {
+ HASH_DEL(procparams, pp);
+ free(pp);
+ }
+ free_standalone_processor(processor_);
+ free_instance(instance_);
+ }
+
+ standalone_processor * getProcessor() const {
+ return processor_;
+ }
+
+ nifi_instance * getInstance() const {
+ return instance_;
+ }
+
+private:
+ nifi_instance * instance_;
+ standalone_processor * processor_;
+};
+
+struct processor_params * invoke_processor(TailFileTestResourceManager& mgr, const char * filePath) {
+ standalone_processor * proc = mgr.getProcessor();
+ set_standalone_property(proc, "file_path", filePath);
+ set_standalone_property(proc, "delimiter", ";");
+
+ flow_file_record * new_ff = invoke(proc);
+
+ char uuid_str[37];
+ get_proc_uuid_from_processor(proc, uuid_str);
+ struct processor_params * pp = get_proc_params(uuid_str);
+ return pp;
+}
+
+#endif /* NANOFI_TESTS_CTESTSBASE_H_ */
+#endif