Merge remote-tracking branch 'apache/master'
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index c3279c2..9e4e1e8 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -37,6 +37,8 @@
file(GLOB NANOFI_EXAMPLES_SOURCES "examples/*.c" )
+file(GLOB NANOFI_ECU_SOURCES "ecu/*.c")
+
include(CheckCXXCompilerFlag)
if (WIN32)
if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
@@ -103,4 +105,5 @@
if (NOT DISABLE_CURL)
add_subdirectory(examples)
+add_subdirectory(ecu)
endif()
diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt
new file mode 100644
index 0000000..b28af76
--- /dev/null
+++ b/nanofi/ecu/CMakeLists.txt
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+ CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(/include)
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+ if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+ CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+ if (_cpp_latest_flag_supported)
+ add_compile_options("/std:c++14")
+ endif()
+ endif()
+else()
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+if (APPLE)
+ set(LINK_FLAGS "-Wl,-all_load")
+ set(LINK_END_FLAGS "")
+elseif (UNIX)
+ set(LINK_FLAGS "-Wl,--whole-archive")
+ set(LINK_END_FLAGS "")
+endif ()
+
+if (NOT WIN32)
+
+add_executable(tail_file tail_file.c)
+
+target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+endif()
\ No newline at end of file
diff --git a/nanofi/ecu/tail_file.c b/nanofi/ecu/tail_file.c
new file mode 100644
index 0000000..c428be1
--- /dev/null
+++ b/nanofi/ecu/tail_file.c
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+struct flow_file_records * flowfiles = NULL;
+nifi_instance * instance = NULL;
+standalone_processor * proc = NULL;
+int file_offset = 0;
+int stopped = 0;
+flow_file_list ff_list;
+token_list tks;
+
+void signal_handler(int signum) {
+ if (signum == SIGINT || signum == SIGTERM) {
+ stopped = 1;
+ }
+}
+
+void transmit_flow_files(nifi_instance * instance) {
+ flow_file_list_node * head = ff_list.head;
+ while (head) {
+ transmit_flowfile(head->ff_record, instance);
+ head = head->next;
+ }
+}
+
+void set_offset(int offset) {
+ file_offset = offset;
+}
+
+int get_offset() {
+ return file_offset;
+}
+
+void on_trigger_callback(processor_session * ps, processor_context * ctx) {
+
+ char file_path[4096];
+ char delimiter[3];
+
+ if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+ return;
+ }
+
+ if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
+ return;
+ }
+
+ if (strlen(delimiter) == 0) {
+ printf("Delimiter not specified or it is empty\n");
+ return;
+ }
+ char delim = delimiter[0];
+
+ if (delim == '\\') {
+ if (strlen(delimiter) > 1) {
+ switch (delimiter[1]) {
+ case 'r':
+ delim = '\r';
+ break;
+ case 't':
+ delim = '\t';
+ break;
+ case 'n':
+ delim = '\n';
+ break;
+ case '\\':
+ delim = '\\';
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ tks = tail_file(file_path, delim, get_offset());
+
+ if (!validate_list(&tks)) return;
+
+ set_offset(get_offset() + tks.total_bytes);
+
+ token_node * head;
+ for (head = tks.head; head && head->data; head = head->next) {
+ flow_file_record * ffr = generate_flow_file(instance, proc);
+ const char * flow_file_path = ffr->contentLocation;
+ FILE * ffp = fopen(flow_file_path, "wb");
+ if (!ffp) {
+ printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
+ break;
+ }
+
+ int count = strlen(head->data);
+ int ret = fwrite(head->data, 1, count, ffp);
+ if (ret < count) {
+ fclose(ffp);
+ break;
+ }
+ fseek(ffp, 0, SEEK_END);
+ ffr->size = ftell(ffp);
+ fclose(ffp);
+ add_flow_file_record(&ff_list, ffr);
+ }
+ free_all_tokens(&tks);
+}
+
+int main(int argc, char** argv) {
+
+ if (argc < 6) {
+ printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
+ exit(1);
+ }
+
+ char * file = argv[1];
+ char * interval = argv[2];
+ char * delimiter = argv[3];
+ char * instance_str = argv[4];
+ char * port_str = argv[5];
+
+ if (access(file, F_OK) == -1) {
+ printf("Error: %s doesn't exist!\n", file);
+ exit(1);
+ }
+
+ struct stat stats;
+ errno = 0;
+ int ret = stat(file, &stats);
+
+ if (ret == -1) {
+ printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
+ exit(1);
+ }
+ // Check for file existence
+ if (S_ISDIR(stats.st_mode)){
+ printf("Error: %s is a directory!\n", file);
+ exit(1);
+ }
+
+ errno = 0;
+ unsigned long intrvl = strtol(interval, NULL, 10);
+
+ if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
+ printf("Invalid interval value specified\n");
+ return 0;
+ }
+
+ struct sigaction action;
+ memset(&action, 0, sizeof(sigaction));
+ action.sa_handler = signal_handler;
+ sigaction(SIGTERM, &action, NULL);
+ sigaction(SIGINT, &action, NULL);
+
+ nifi_port port;
+
+ port.port_id = port_str;
+
+ instance = create_instance(instance_str, &port);
+
+ const char * processor_name = "TailFile";
+
+ add_custom_processor(processor_name, on_trigger_callback);
+
+ proc = create_processor(processor_name);
+
+ set_standalone_property(proc, "file_path", file);
+ set_standalone_property(proc, "delimiter", delimiter);
+
+ set_offset(0);
+ while (!stopped) {
+ flow_file_record * new_ff = invoke(proc);
+ transmit_flow_files(instance);
+ free_flow_file_list(&ff_list);
+ free_flowfile(new_ff);
+ sleep(intrvl);
+ }
+
+ printf("tail file processor stopped\n");
+ free_standalone_processor(proc);
+ free(instance);
+
+ return 0;
+}
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index 4301a7d..b9480ed 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -83,8 +83,8 @@
target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
-add_executable(tail_file tail_file.c)
+#add_executable(tail_file tail_file.c)
-target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+#target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
endif()
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index 23688d2..3be3ce3 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -156,9 +156,35 @@
* ##################################################################
*/
-typedef struct tokens {
- char ** str_list;
- uint64_t num_strings;
+typedef struct token_node {
+ char * data;
+ struct token_node * next;
+} token_node;
+
+typedef struct token_list {
+ struct token_node * head;
+ struct token_node * tail;
+ uint64_t size;
uint64_t total_bytes;
-} tokens;
+ int has_non_delimited_token;
+} token_list;
+
+/****
+ * ##################################################################
+ * FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct flow_file_list_node {
+ flow_file_record * ff_record;
+ struct flow_file_list_node * next;
+} flow_file_list_node;
+
+typedef struct flow_file_list {
+ flow_file_list_node * head;
+ flow_file_list_node * tail;
+ int len;
+ int offset;
+} flow_file_list;
+
#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/file_utils.h b/nanofi/include/core/file_utils.h
new file mode 100644
index 0000000..25c3029
--- /dev/null
+++ b/nanofi/include/core/file_utils.h
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NANOFI_INCLUDE_CORE_FILE_UTILS_H_
+#define NANOFI_INCLUDE_CORE_FILE_UTILS_H_
+
+#include "flowfiles.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Tails a delimited file starting from an offset up to the end of file
+ * @param file the path to the file to tail
+ * @param delim the delimiter character
+ * @param curr_offset the offset in the file to tail from.
+ * For eg. To tail from beginning of the file curr_offset = 0
+ * @return a list of tokens
+ */
+token_list tail_file(const char * file, char delim, int curr_offset);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* NANOFI_INCLUDE_CORE_FILE_UTILS_H_ */
diff --git a/nanofi/include/core/flowfiles.h b/nanofi/include/core/flowfiles.h
new file mode 100644
index 0000000..60ac02e
--- /dev/null
+++ b/nanofi/include/core/flowfiles.h
@@ -0,0 +1,28 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NANOFI_INCLUDE_CORE_FLOWFILES_H_
+#define NANOFI_INCLUDE_CORE_FLOWFILES_H_
+
+#include "cstructs.h"
+
+void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record);
+
+void free_flow_file_list(flow_file_list * ff_list);
+
+#endif /* NANOFI_INCLUDE_CORE_FLOWFILES_H_ */
diff --git a/nanofi/include/core/string_utils.h b/nanofi/include/core/string_utils.h
index 62aac99..f25adf8 100644
--- a/nanofi/include/core/string_utils.h
+++ b/nanofi/include/core/string_utils.h
@@ -26,25 +26,66 @@
extern "C" {
#endif
-typedef enum TOKENIZER_MODE {
- TAILFILE_MODE = 0, /* Do not include a non delimiting string */
- DEFAULT_MODE /* include a non delimiting string */
-} tokenizer_mode_t;
+#define MAX_BYTES_READ 4096
/**
* Tokenizes a delimited string and returns a list of tokens
* @param str the string to be tokenized
* @param delim the delimiting character
- * @param tokenizer_mode_t the enumeration value specified to include/exclude a non delimiting string in the result
- * @return a list of strings wrapped inside tokens struct
+ * @return a list of tokens
*/
-tokens tokenize_string(const char * str, char delim, tokenizer_mode_t);
+token_list tokenize_string(const char * str, char delim);
/**
- * Free the dynamically allocated tokens
- * @param tks the tokens to be freed
+ * Tokenizes a delimited string and returns a list of tokens but excludes
+ * the last token if it is not delimited by the delimiter. This function
+ * is used by tailfile processor
+ * @param str the cstring to tokenize
+ * @param delim the delimiter to tokenize by
+ * @return a list of tokens
*/
-void free_tokens(tokens * tks);
+token_list tokenize_string_tailfile(const char * str, char delim);
+
+/**
+ * Adds a token to the token list
+ * @param tks the token list to add the token to
+ * @param begin the beginning of the token
+ * @param len the length of the token
+ */
+void add_token_to_list(token_list * tks, const char * begin, uint64_t len);
+
+/**
+ * Deallocate one token node
+ * @param node the node in the list to be deallocated
+ */
+void free_token_node(token_node * node);
+
+/**
+ * Deallocate the dynamically allocated token list
+ * @param tks the token list to be freed
+ */
+void free_all_tokens(token_list * tks);
+
+/**
+ * Remove the tail node from the list
+ * @param tks the linked list of token nodes
+ */
+void remove_last_node(token_list * tks);
+
+/**
+ * Validate a linked list
+ * @param tks_list the list to be validated
+ * @return 1 if the list if valid else 0
+ */
+int validate_list(token_list * tk_list);
+
+/**
+ * Append one list to other (Chaining)
+ * @param to, the destination list to append to
+ * @param from, the source list
+ * @attention, if the to list is empty, to and from will be same after appending
+ */
+void attach_lists(token_list * to, token_list * from);
#ifdef __cplusplus
}
diff --git a/nanofi/src/core/file_utils.c b/nanofi/src/core/file_utils.c
new file mode 100644
index 0000000..3f7b79e
--- /dev/null
+++ b/nanofi/src/core/file_utils.c
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+token_list tail_file(const char * file_path, char delim, int curr_offset) {
+ token_list tkn_list;
+ memset(&tkn_list, 0, sizeof(struct token_list));
+
+ if (!file_path) {
+ return tkn_list;
+ }
+
+ char buff[MAX_BYTES_READ + 1];
+ memset(buff, 0, MAX_BYTES_READ+1);
+ errno = 0;
+ FILE * fp = fopen(file_path, "rb");
+ if (!fp) {
+ printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
+ return tkn_list;
+ }
+ fseek(fp, curr_offset, SEEK_SET);
+
+ int bytes_read = 0;
+ int i = 0;
+ while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+ buff[bytes_read] = '\0';
+ struct token_list tokens = tokenize_string_tailfile(buff, delim);
+ if (tokens.size > 0) {
+ attach_lists(&tkn_list, &tokens);
+ }
+ tkn_list.total_bytes += tokens.total_bytes;
+ if (tokens.total_bytes > 0) {
+ curr_offset += tokens.total_bytes;
+ fseek(fp, curr_offset, SEEK_SET);
+ }
+ memset(buff, 0, MAX_BYTES_READ);
+ }
+ fclose(fp);
+ return tkn_list;
+}
diff --git a/nanofi/src/core/flowfiles.c b/nanofi/src/core/flowfiles.c
new file mode 100644
index 0000000..edf2c5b
--- /dev/null
+++ b/nanofi/src/core/flowfiles.c
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flowfiles.h"
+#include <string.h>
+
+void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record) {
+ if (!ff_list || !record) return;
+
+ struct flow_file_list_node * new_node = (struct flow_file_list_node *)malloc(sizeof(struct flow_file_list_node));
+ new_node->ff_record = record;
+ new_node->next = NULL;
+
+ if (!ff_list->head || !ff_list->tail) {
+ ff_list->head = ff_list->tail = new_node;
+ ff_list->len = 1;
+ return;
+ }
+
+ ff_list->tail->next = new_node;
+ ff_list->tail = new_node;
+ ff_list->len++;
+}
+
+void free_flow_file_list(flow_file_list * ff_list) {
+ if (!ff_list || !ff_list->head) {
+ return;
+ }
+
+ flow_file_list_node * head = ff_list->head;
+ while (head) {
+ free_flowfile(head->ff_record);
+ flow_file_list_node * tmp = head;
+ head = head->next;
+ free(tmp);
+ }
+ memset(ff_list, 0, sizeof(struct flow_file_list));
+}
diff --git a/nanofi/src/core/string_utils.c b/nanofi/src/core/string_utils.c
index 517da1f..6c6b8ea 100644
--- a/nanofi/src/core/string_utils.c
+++ b/nanofi/src/core/string_utils.c
@@ -20,36 +20,117 @@
#include "core/string_utils.h"
#include <string.h>
#include <stdlib.h>
+#include <stdio.h>
-tokens tokenize_string(const char * str, char delim, tokenizer_mode_t mode) {
- tokens tks;
- tks.num_strings = 0;
- tks.total_bytes = 0;
+int validate_list(struct token_list * tk_list) {
+ if (tk_list && tk_list->head && tk_list->tail && tk_list->size > 0) {
+ return 1;
+ }
+ return 0;
+}
- if (!str) return tks;
+void add_token_to_list(struct token_list * tk_list, const char * begin, uint64_t len) {
+ struct token_node * new_node = (struct token_node *)malloc(sizeof(struct token_node));
+ new_node->data = (char *)malloc((len+1) * sizeof(char));
+ strncpy(new_node->data, begin, len);
+ new_node->data[len] = '\0';
+ new_node->next = NULL;
- char * begin = (char *)str;
- char * end = NULL;
- int num_strings = 0;
- while ((end = strchr(begin, delim))) {
- if (begin == end) {
- begin++;
- continue;
+ if (!tk_list->head) {
+ tk_list->head = tk_list->tail = new_node;
+ tk_list->size++;
+ tk_list->total_bytes += len;
+ return;
+ }
+
+ tk_list->tail->next = new_node;
+ tk_list->tail = new_node;
+ tk_list->size++;
+ tk_list->total_bytes += len;
+}
+
+void free_token_node(struct token_node * node) {
+ if (node) {
+ free(node->data);
+ }
+ free(node);
+}
+
+void free_all_tokens(struct token_list * tks) {
+ while (tks && tks->head) {
+ struct token_node * node = tks->head;
+ tks->head = tks->head->next;
+ free_token_node(node);
+ }
+}
+
+void print_token_list(token_list * tokens) {
+ if (tokens) {
+ token_node * head = tokens->head;
+ int i = 0;
+ while (head) {
+ printf("Token %d : %s Length = %lu\n", i, head->data, strlen(head->data));
+ head = head->next;
+ ++i;
}
- begin = (end+1);
- num_strings++;
+ }
+}
+
+void remove_last_node(token_list * tks) {
+ if (!validate_list(tks)) {
+ return;
}
- if (mode == DEFAULT_MODE && (*begin != '\0')) {
- num_strings++;
+ if (tks->size == 1 || tks->head == tks->tail) {
+ tks->total_bytes -= strlen(tks->tail->data);
+ free_all_tokens(tks);
+ tks->head = NULL;
+ tks->tail = NULL;
+ tks->size = 0;
+ return;
}
- tks.str_list = calloc(num_strings, sizeof(char *));
- tks.num_strings = 0;
- tks.total_bytes = 0;
+ struct token_node * tmp_head = tks->head;
+ struct token_node * tmp_tail = tks->tail;
- begin = (char *)str;
- end = NULL;
+ while (tmp_head->next && (tmp_head->next != tmp_tail)) {
+ tmp_head = tmp_head->next;
+ }
+
+ struct token_node * tail_node = tmp_tail;
+ tks->tail = tmp_head;
+ tks->tail->next = NULL;
+
+ tks->size--;
+ tks->total_bytes -= (strlen(tail_node->data));
+ free_token_node(tail_node);
+}
+
+void attach_lists(token_list * to, token_list * from) {
+ if (to && validate_list(from)) {
+ if (!to->head) {
+ to->head = from->head;
+ to->tail = from->tail;
+ to->size += from->size;
+ return;
+ }
+
+ if (!to->tail) return;
+
+ to->tail->next = from->head;
+ to->tail = from->tail;
+ to->size += from->size;
+ }
+}
+
+token_list tokenize_string(const char * begin, char delim) {
+ token_list tks;
+ memset(&tks, 0, sizeof(struct token_list));
+
+ if (!begin) return tks;
+
+ const char * end = NULL;
+
while ((end = strchr(begin, delim))) {
if (begin == end) {
begin++;
@@ -57,30 +138,27 @@
continue;
}
int len = end - begin;
- char * substr = (char *)malloc((len+1) * sizeof(char));
- strncpy(substr, begin, len);
- substr[len] = '\0';
- tks.str_list[tks.num_strings++] = substr;
- tks.total_bytes += (len+1);
+ add_token_to_list(&tks, begin, len);
+ tks.total_bytes++;
begin = (end+1);
}
- if (mode == DEFAULT_MODE && (*begin != '\0')) {
+ if (begin && *begin != '\0') {
int len = strlen(begin);
- char * substr = (char *)malloc((len+1) * sizeof(char));
- strncpy(substr, begin, len);
- substr[len] = '\0';
- tks.str_list[tks.num_strings++] = substr;
- tks.total_bytes += (len+1);
+ if (len < MAX_BYTES_READ) {
+ tks.has_non_delimited_token = 1;
+ }
+ add_token_to_list(&tks, begin, len);
}
+
return tks;
}
-void free_tokens(tokens * tks) {
- if (tks) {
- int i;
- for (i = 0; i < tks->num_strings; ++i) {
- free(tks->str_list[i]);
- }
+token_list tokenize_string_tailfile(const char * str, char delim) {
+ token_list tks = tokenize_string(str, delim);
+ if (tks.has_non_delimited_token) {
+ remove_last_node(&tks);
}
+ tks.has_non_delimited_token = 0;
+ return tks;
}
diff --git a/nanofi/tests/CTailFileTests.cpp b/nanofi/tests/CTailFileTests.cpp
new file mode 100644
index 0000000..491e8c0
--- /dev/null
+++ b/nanofi/tests/CTailFileTests.cpp
@@ -0,0 +1,369 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "catch.hpp"
+
+#include <vector>
+#include <string>
+#include <fstream>
+#include <numeric>
+#include <algorithm>
+#include <assert.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/stat.h>
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+
+void test_lists_equal(token_list * tknlist, const std::vector<std::string>& sv) {
+ REQUIRE(tknlist != NULL);
+ if (sv.empty()) {
+ REQUIRE(tknlist->head == NULL);
+ REQUIRE(tknlist->size == 0);
+ return;
+ }
+ REQUIRE(tknlist->size == sv.size());
+ for (const auto& s : sv) {
+ if (tknlist->head) {
+ REQUIRE(strcmp(s.c_str(), tknlist->head->data) == 0);
+ tknlist->head = tknlist->head->next;
+ }
+ }
+}
+
+std::string join_strings(const std::vector<std::string>& strings, const std::string& token) {
+ if (strings.empty()) {
+ return "";
+ }
+
+ if (strings.size() == 1) {
+ return strings.at(0);
+ }
+ const auto& initialValue = strings.at(0);
+ return std::accumulate(strings.begin()+1, strings.end(), initialValue,
+ [token,initialValue](const std::string& s1, const std::string& s2) {
+ return s1 + token + s2;
+ });
+}
+
+/****
+ * ##################################################################
+ * CTAILFILE STRING TOKENIZER TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test string tokenizer empty string", "[stringTokenizerEmptyString]") {
+ std::string delimitedString = "";
+ char delim = '-';
+ struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+ REQUIRE(tokens.size == 0);
+}
+
+TEST_CASE("Test string tokenizer normal delimited string", "[stringTokenizerDelimitedString]") {
+ std::vector<std::string> slist = {"this", "is a", "delimited", "string"};
+ std::string delimitedString = join_strings(slist, "-");
+ char delim = '-';
+ struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+ REQUIRE(tokens.size == 4);
+ REQUIRE(validate_list(&tokens) == 1);
+ test_lists_equal(&tokens, slist);
+}
+
+TEST_CASE("Test string tokenizer delimiter started string", "[stringTokenizerDelimiterStartedString]") {
+ std::vector<std::string> slist = {"", "this", "is", "a test"};
+ std::string delimitedString = join_strings(slist, "--");
+ char delim = '-';
+ struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+ REQUIRE(tokens.size == 3);
+ REQUIRE(validate_list(&tokens) == 1);
+ slist.erase(std::remove(slist.begin(), slist.end(), ""), slist.end());
+ test_lists_equal(&tokens, slist);
+}
+
+TEST_CASE("Test string tokenizer only delimiter character string", "[stringTokenizerDelimiterOnlyString]") {
+ std::string delimitedString = "--------";
+ char delim = '-';
+ struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+ REQUIRE(tokens.size == 0);
+}
+
+/****
+ * ##################################################################
+ * CTAILFILE TAIL FILE BEHAVED DELIMITED STRING TOKENIZER TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test string tokenizer for a delimited string less than 4096 bytes", "[testDelimitedStringTokenizer]") {
+
+ std::vector<std::string> slist = {"this", "is a", "delimited", "string", ""};
+ std::string delimitedString = join_strings(slist, "-");
+ int len = strlen(delimitedString.c_str());
+ char delim = '-';
+ struct token_list tokens = tokenize_string_tailfile(delimitedString.c_str(), delim);
+ REQUIRE(tokens.has_non_delimited_token == 0);
+ REQUIRE(tokens.size == 4);
+ REQUIRE(validate_list(&tokens) == 1);
+ slist.pop_back();
+ test_lists_equal(&tokens, slist);
+}
+
+TEST_CASE("Test string tokenizer for a non-delimited string less than 4096 bytes", "[testNonDelimitedStringTokenizer]") {
+ std::string nonDelimitedString = "this is a non delimited string";
+ char delim = '-';
+ struct token_list tokens = tokenize_string_tailfile(nonDelimitedString.c_str(), delim);
+ REQUIRE(tokens.has_non_delimited_token == 0);
+ REQUIRE(tokens.size == 0);
+ test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for empty string", "[testEmptyStringTokenizer]") {
+ const std::string emptyString = "";
+ char delim = '-';
+ struct token_list tokens = tokenize_string_tailfile(emptyString.c_str(), delim);
+ REQUIRE(tokens.has_non_delimited_token == 0);
+ REQUIRE(tokens.size == 0);
+ test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for string containing only delimited characters", "[testDelimiterCharOnlyStringTokenizer]") {
+ const std::string str = "----";
+ char delim = '-';
+ struct token_list tokens = tokenize_string_tailfile(str.c_str(), delim);
+ REQUIRE(tokens.has_non_delimited_token == 0);
+ REQUIRE(tokens.size == 0);
+ test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for string starting with delimited characters", "[testDelimitedStartingStringTokenizer]") {
+ const std::string str = "----mystring";
+ char delim = '-';
+ struct token_list tokens = tokenize_string_tailfile(str.c_str(), delim);
+ REQUIRE(tokens.has_non_delimited_token == 0);
+ REQUIRE(tokens.size == 0);
+ test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for string starting and ending with delimited characters", "[testDelimitedStartingEndingStringTokenizer]") {
+ const std::string str = "---token1---token2---token3";
+ char delim = '-';
+ struct token_list tokens = tokenize_string_tailfile(str.c_str(), delim);
+ REQUIRE(tokens.has_non_delimited_token == 0);
+ REQUIRE(tokens.size == 2);
+ test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"});
+}
+
+/****
+ * ##################################################################
+ * CTAILFILE TAIL TESTS
+ * ##################################################################
+ */
+
+class FileManager {
+public:
+ FileManager(const std::string& filePath) {
+ assert(!filePath.empty() && "filePath provided cannot be empty!");
+ filePath_ = filePath;
+ outputStream_.open(filePath_, std::ios::binary);
+ }
+
+ ~FileManager() {
+ std::ifstream ifs(filePath_);
+ if (ifs.good()) {
+ remove(filePath_.c_str());
+ }
+ }
+
+ void Write(const std::string& str) {
+ outputStream_ << str;
+ }
+
+ std::string WriteNChars(uint64_t n, char c) {
+ std::string s(n, c);
+ outputStream_ << s;
+ return s;
+ }
+
+ std::string getFilePath() const {
+ return filePath_;
+ }
+
+ void CloseStream() {
+ outputStream_.flush();
+ outputStream_.close();
+ }
+
+ uint64_t GetFileSize() {
+ CloseStream();
+ struct stat buff;
+ if (stat(filePath_.c_str(), &buff) == 0) {
+ return buff.st_size;
+ }
+ return 0;
+ }
+
+private:
+ std::string filePath_;
+ std::ofstream outputStream_;
+};
+
+TEST_CASE("Simple tail file test", "[testTailFile]") {
+
+ FileManager fm("test.txt");
+ fm.Write("hello world");
+ fm.CloseStream();
+
+ const char * file = fm.getFilePath().c_str();
+ struct token_list tkn_list = tail_file(file, ';', 0);
+ REQUIRE(tkn_list.size == 0);
+ REQUIRE(tkn_list.head == NULL);
+ REQUIRE(tkn_list.total_bytes == 0);
+}
+
+TEST_CASE("Empty file tail test", "[testEmptyFileTail]") {
+ FileManager fm("test.txt");
+ fm.CloseStream();
+
+ const char * file = fm.getFilePath().c_str();
+ struct token_list tkn_list = tail_file(file, ';', 0);
+ REQUIRE(tkn_list.size == 0);
+ REQUIRE(tkn_list.head == NULL);
+ REQUIRE(tkn_list.total_bytes == 0);
+}
+
+TEST_CASE("File containing only delimiters tail test", "[testDelimiterOnlyFileTail]") {
+ FileManager fm("test.txt");
+ fm.Write("----");
+ fm.CloseStream();
+
+ const char * file = fm.getFilePath().c_str();
+ struct token_list tkn_list = tail_file(file, '-', 0);
+ REQUIRE(tkn_list.size == 0);
+ REQUIRE(tkn_list.head == NULL);
+ REQUIRE(tkn_list.total_bytes == 4);
+}
+
+TEST_CASE("File tail test string starting with delimiter", "[testDelimiterOnlyFileTail]") {
+ FileManager fm("test.txt");
+ fm.Write("----hello");
+ fm.CloseStream();
+
+ const char * file = fm.getFilePath().c_str();
+ struct token_list tkn_list = tail_file(file, '-', 0);
+ REQUIRE(tkn_list.size == 0);
+ REQUIRE(tkn_list.head == NULL);
+ REQUIRE(tkn_list.total_bytes == 4);
+}
+
+TEST_CASE("Test tail file with less than 4096 delimited chars", "[testTailFileDelimitedString]") {
+
+ const std::string delimitedString = "token1--token2--token3";
+ FileManager fm("test.txt");
+ fm.Write(delimitedString);
+ const std::string filePath = fm.getFilePath();
+ fm.CloseStream();
+
+ struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+ test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"});
+}
+
+// Although there is no delimiter within the string that is at least 4096 bytes long,
+// tail_file still creates a flow file for the first 4096 bytes.
+TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testTailFile4096Chars]") {
+
+ FileManager fm("test.txt");
+ const std::string s = std::move(fm.WriteNChars(4096, 'a'));
+ const std::string filePath = fm.getFilePath();
+ fm.CloseStream();
+
+ struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+ test_lists_equal(&tokens, std::vector<std::string>{std::move(s)});
+}
+
+// Although there is no delimiter within the string that is equal to 4096 bytes or longer
+// tail_file creates a flow file for each subsequent 4096 byte chunk. It leaves the last chunk
+// if it is smaller than 4096 bytes and not delimited
+TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testTailFileMoreThan4096Chars]") {
+
+ FileManager fm("test.txt");
+ const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+ const std::string s2 = std::move(fm.WriteNChars(4096, 'b'));
+ const std::string s3 = "helloworld";
+ fm.Write(s3);
+ fm.CloseStream();
+
+ const uint64_t totalStringsSize = s1.size() + s2.size() + s3.size();
+ const std::string filePath = fm.getFilePath();
+ const uint64_t bytesWrittenToStream = fm.GetFileSize();
+ REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+ struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+ test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+}
+
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testTailFileWithDelimitedString]") {
+
+ FileManager fm("test.txt");
+ const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+ const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+ const std::string s2 = std::move(fm.WriteNChars(4096, 'b'));
+ fm.CloseStream();
+
+ const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
+ const std::string filePath = fm.getFilePath();
+ const uint64_t bytesWrittenToStream = fm.GetFileSize();
+ REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+ struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+ test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+}
+
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testTailFileWithDelimitedString]") {
+
+ FileManager fm("test.txt");
+ const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+ const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+ const std::string s2 = std::move(fm.WriteNChars(4000, 'b'));
+ fm.CloseStream();
+
+ const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
+ const std::string filePath = fm.getFilePath();
+ const uint64_t bytesWrittenToStream = fm.GetFileSize();
+ REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+ struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+ test_lists_equal(&tokens, std::vector<std::string>{std::move(s1)});
+}
+
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testTailFileWithDelimitedString]") {
+
+ FileManager fm("test.txt");
+ const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+ const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+ const std::string s2 = std::move(fm.WriteNChars(4098, 'b'));
+ fm.CloseStream();
+
+ const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
+ const std::string filePath = fm.getFilePath();
+ const uint64_t bytesWrittenToStream = fm.GetFileSize();
+ REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+ const std::string s3 = std::string(s2.data(), s2.data()+4096);
+ struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+ test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s3)});
+}