Merge remote-tracking branch 'apache/master'
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index c3279c2..9e4e1e8 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -37,6 +37,8 @@
 
 file(GLOB NANOFI_EXAMPLES_SOURCES  "examples/*.c" )
 
+file(GLOB NANOFI_ECU_SOURCES "ecu/*.c")
+
 include(CheckCXXCompilerFlag)
 if (WIN32)
   if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
@@ -103,4 +105,5 @@
 
 if (NOT DISABLE_CURL)
 add_subdirectory(examples)
+add_subdirectory(ecu)
 endif()
diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt
new file mode 100644
index 0000000..b28af76
--- /dev/null
+++ b/nanofi/ecu/CMakeLists.txt
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(/include)
+
+include(CheckCXXCompilerFlag)
+if (WIN32)
+  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
+	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
+	    if (_cpp_latest_flag_supported)
+	        add_compile_options("/std:c++14")
+	    endif()
+	endif()
+else()
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+endif()
+
+if (APPLE)
+    set(LINK_FLAGS "-Wl,-all_load")
+    set(LINK_END_FLAGS "")
+elseif (UNIX)
+    set(LINK_FLAGS "-Wl,--whole-archive")
+    set(LINK_END_FLAGS "")
+endif ()
+
+if (NOT WIN32)
+
+add_executable(tail_file tail_file.c)
+
+target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+endif()
\ No newline at end of file
diff --git a/nanofi/ecu/tail_file.c b/nanofi/ecu/tail_file.c
new file mode 100644
index 0000000..c428be1
--- /dev/null
+++ b/nanofi/ecu/tail_file.c
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+struct flow_file_records * flowfiles = NULL;
+nifi_instance * instance = NULL;
+standalone_processor * proc = NULL;
+int file_offset = 0;
+int stopped = 0;
+flow_file_list ff_list;
+token_list tks;
+
+void signal_handler(int signum) {
+    if (signum == SIGINT || signum == SIGTERM) {
+        stopped = 1;
+    }
+}
+
+void transmit_flow_files(nifi_instance * instance) {
+    flow_file_list_node * head = ff_list.head;
+    while (head) {
+        transmit_flowfile(head->ff_record, instance);
+        head = head->next;
+    }
+}
+
+void set_offset(int offset) {
+    file_offset = offset;
+}
+
+int get_offset() {
+    return file_offset;
+}
+
+void on_trigger_callback(processor_session * ps, processor_context * ctx) {
+
+    char file_path[4096];
+    char delimiter[3];
+
+    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+        return;
+    }
+
+    if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
+        return;
+    }
+
+    if (strlen(delimiter) == 0) {
+        printf("Delimiter not specified or it is empty\n");
+        return;
+    }
+    char delim = delimiter[0];
+
+    if (delim == '\\') {
+          if (strlen(delimiter) > 1) {
+            switch (delimiter[1]) {
+              case 'r':
+                delim = '\r';
+                break;
+              case 't':
+                delim = '\t';
+                break;
+              case 'n':
+                delim = '\n';
+                break;
+              case '\\':
+                delim = '\\';
+                break;
+              default:
+                break;
+            }
+        }
+    }
+
+    tks = tail_file(file_path, delim, get_offset());
+
+    if (!validate_list(&tks)) return;
+
+    set_offset(get_offset() + tks.total_bytes);
+
+    token_node * head;
+    for (head = tks.head; head && head->data; head = head->next) {
+        flow_file_record * ffr = generate_flow_file(instance, proc);
+        const char * flow_file_path = ffr->contentLocation;
+        FILE * ffp = fopen(flow_file_path, "wb");
+        if (!ffp) {
+            printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
+            break;
+        }
+
+        int count = strlen(head->data);
+        int ret = fwrite(head->data, 1, count, ffp);
+        if (ret < count) {
+            fclose(ffp);
+            break;
+        }
+        fseek(ffp, 0, SEEK_END);
+        ffr->size = ftell(ffp);
+        fclose(ffp);
+        add_flow_file_record(&ff_list, ffr);
+    }
+    free_all_tokens(&tks);
+}
+
+int main(int argc, char** argv) {
+
+    if (argc < 6) {
+        printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
+        exit(1);
+    }
+
+    char * file = argv[1];
+    char * interval = argv[2];
+    char * delimiter = argv[3];
+    char * instance_str = argv[4];
+    char * port_str = argv[5];
+
+    if (access(file, F_OK) == -1) {
+        printf("Error: %s doesn't exist!\n", file);
+        exit(1);
+    }
+
+    struct stat stats;
+    errno = 0;
+    int ret = stat(file, &stats);
+
+    if (ret == -1) {
+        printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
+        exit(1);
+    }
+    // Check for file existence
+    if (S_ISDIR(stats.st_mode)){
+        printf("Error: %s is a directory!\n", file);
+        exit(1);
+    }
+
+    errno = 0;
+    unsigned long intrvl = strtol(interval, NULL, 10);
+
+    if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
+        printf("Invalid interval value specified\n");
+        return 0;
+    }
+
+    struct sigaction action;
+    memset(&action, 0, sizeof(sigaction));
+    action.sa_handler = signal_handler;
+    sigaction(SIGTERM, &action, NULL);
+    sigaction(SIGINT, &action, NULL);
+
+    nifi_port port;
+
+    port.port_id = port_str;
+
+    instance = create_instance(instance_str, &port);
+
+    const char * processor_name = "TailFile";
+
+    add_custom_processor(processor_name, on_trigger_callback);
+
+    proc = create_processor(processor_name);
+
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "delimiter", delimiter);
+
+    set_offset(0);
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(proc);
+        transmit_flow_files(instance);
+        free_flow_file_list(&ff_list);
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    printf("tail file processor stopped\n");
+    free_standalone_processor(proc);
+    free(instance);
+
+    return 0;
+}
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index 4301a7d..b9480ed 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -83,8 +83,8 @@
 
 target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
 
-add_executable(tail_file tail_file.c)
+#add_executable(tail_file tail_file.c)
 
-target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+#target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
 
 endif()
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index 23688d2..3be3ce3 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -156,9 +156,35 @@
  * ##################################################################
  */
 
-typedef struct tokens {
-    char ** str_list;
-    uint64_t num_strings;
+typedef struct token_node {
+    char * data;
+    struct token_node * next;
+} token_node;
+
+typedef struct token_list {
+    struct token_node * head;
+    struct token_node * tail;
+    uint64_t size;
     uint64_t total_bytes;
-} tokens;
+    int has_non_delimited_token;
+} token_list;
+
+/****
+ * ##################################################################
+ *  FLOWFILE OPERATIONS
+ * ##################################################################
+ */
+
+typedef struct flow_file_list_node {
+    flow_file_record * ff_record;
+    struct flow_file_list_node * next;
+} flow_file_list_node;
+
+typedef struct flow_file_list {
+    flow_file_list_node * head;
+    flow_file_list_node * tail;
+    int len;
+    int offset;
+} flow_file_list;
+
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/file_utils.h b/nanofi/include/core/file_utils.h
new file mode 100644
index 0000000..25c3029
--- /dev/null
+++ b/nanofi/include/core/file_utils.h
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NANOFI_INCLUDE_CORE_FILE_UTILS_H_
+#define NANOFI_INCLUDE_CORE_FILE_UTILS_H_
+
+#include "flowfiles.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Tails a delimited file starting from an offset up to the end of file
+ * @param file the path to the file to tail
+ * @param delim the delimiter character
+ * @param curr_offset the offset in the file to tail from.
+ * For eg. To tail from beginning of the file curr_offset = 0
+ * @return a list of tokens
+ */
+token_list tail_file(const char * file, char delim, int curr_offset);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* NANOFI_INCLUDE_CORE_FILE_UTILS_H_ */
diff --git a/nanofi/include/core/flowfiles.h b/nanofi/include/core/flowfiles.h
new file mode 100644
index 0000000..60ac02e
--- /dev/null
+++ b/nanofi/include/core/flowfiles.h
@@ -0,0 +1,28 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NANOFI_INCLUDE_CORE_FLOWFILES_H_
+#define NANOFI_INCLUDE_CORE_FLOWFILES_H_
+
+#include "cstructs.h"
+
+void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record);
+
+void free_flow_file_list(flow_file_list * ff_list);
+
+#endif /* NANOFI_INCLUDE_CORE_FLOWFILES_H_ */
diff --git a/nanofi/include/core/string_utils.h b/nanofi/include/core/string_utils.h
index 62aac99..f25adf8 100644
--- a/nanofi/include/core/string_utils.h
+++ b/nanofi/include/core/string_utils.h
@@ -26,25 +26,66 @@
 extern "C" {
 #endif
 
-typedef enum TOKENIZER_MODE {
-    TAILFILE_MODE = 0, /* Do not include a non delimiting string */
-    DEFAULT_MODE /* include a non delimiting string */
-} tokenizer_mode_t;
+#define MAX_BYTES_READ 4096
 
 /**
  * Tokenizes a delimited string and returns a list of tokens
  * @param str the string to be tokenized
  * @param delim the delimiting character
- * @param tokenizer_mode_t the enumeration value specified to include/exclude a non delimiting string in the result
- * @return a list of strings wrapped inside tokens struct
+ * @return a list of tokens
  */
-tokens tokenize_string(const char * str, char delim, tokenizer_mode_t);
+token_list tokenize_string(const char * str, char delim);
 
 /**
- * Free the dynamically allocated tokens
- * @param tks the tokens to be freed
+ * Tokenizes a delimited string and returns a list of tokens but excludes
+ * the last token if it is not delimited by the delimiter. This function
+ * is used by tailfile processor
+ * @param str the cstring to tokenize
+ * @param delim the delimiter to tokenize by
+ * @return a list of tokens
  */
-void free_tokens(tokens * tks);
+token_list tokenize_string_tailfile(const char * str, char delim);
+
+/**
+ * Adds a token to the token list
+ * @param tks the token list to add the token to
+ * @param begin the beginning of the token
+ * @param len the length of the token
+ */
+void add_token_to_list(token_list * tks, const char * begin, uint64_t len);
+
+/**
+ * Deallocate one token node
+ * @param node the node in the list to be deallocated
+ */
+void free_token_node(token_node * node);
+
+/**
+ * Deallocate the dynamically allocated token list
+ * @param tks the token list to be freed
+ */
+void free_all_tokens(token_list * tks);
+
+/**
+ * Remove the tail node from the list
+ * @param tks the linked list of token nodes
+ */
+void remove_last_node(token_list * tks);
+
+/**
+ * Validate a linked list
+ * @param tks_list the list to be validated
+ * @return 1 if the list if valid else 0
+ */
+int validate_list(token_list * tk_list);
+
+/**
+ * Append one list to other (Chaining)
+ * @param to, the destination list to append to
+ * @param from, the source list
+ * @attention, if the to list is empty, to and from will be same after appending
+ */
+void attach_lists(token_list * to, token_list * from);
 
 #ifdef __cplusplus
 }
diff --git a/nanofi/src/core/file_utils.c b/nanofi/src/core/file_utils.c
new file mode 100644
index 0000000..3f7b79e
--- /dev/null
+++ b/nanofi/src/core/file_utils.c
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+token_list tail_file(const char * file_path, char delim, int curr_offset) {
+    token_list tkn_list;
+    memset(&tkn_list, 0, sizeof(struct token_list));
+
+    if (!file_path) {
+        return tkn_list;
+    }
+
+    char buff[MAX_BYTES_READ + 1];
+    memset(buff, 0, MAX_BYTES_READ+1);
+    errno = 0;
+    FILE * fp = fopen(file_path, "rb");
+    if (!fp) {
+        printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
+        return tkn_list;
+    }
+    fseek(fp, curr_offset, SEEK_SET);
+
+    int bytes_read = 0;
+    int i = 0;
+    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        struct token_list tokens = tokenize_string_tailfile(buff, delim);
+        if (tokens.size > 0) {
+            attach_lists(&tkn_list, &tokens);
+        }
+        tkn_list.total_bytes += tokens.total_bytes;
+        if (tokens.total_bytes > 0) {
+            curr_offset += tokens.total_bytes;
+            fseek(fp, curr_offset, SEEK_SET);
+        }
+        memset(buff, 0, MAX_BYTES_READ);
+    }
+    fclose(fp);
+    return tkn_list;
+}
diff --git a/nanofi/src/core/flowfiles.c b/nanofi/src/core/flowfiles.c
new file mode 100644
index 0000000..edf2c5b
--- /dev/null
+++ b/nanofi/src/core/flowfiles.c
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flowfiles.h"
+#include <string.h>
+
+void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record) {
+    if (!ff_list || !record) return;
+
+    struct flow_file_list_node * new_node = (struct flow_file_list_node *)malloc(sizeof(struct flow_file_list_node));
+    new_node->ff_record = record;
+    new_node->next = NULL;
+
+    if (!ff_list->head || !ff_list->tail) {
+        ff_list->head = ff_list->tail = new_node;
+        ff_list->len = 1;
+        return;
+    }
+
+    ff_list->tail->next = new_node;
+    ff_list->tail = new_node;
+    ff_list->len++;
+}
+
+void free_flow_file_list(flow_file_list * ff_list) {
+    if (!ff_list || !ff_list->head) {
+        return;
+    }
+
+    flow_file_list_node * head = ff_list->head;
+    while (head) {
+        free_flowfile(head->ff_record);
+        flow_file_list_node * tmp = head;
+        head = head->next;
+        free(tmp);
+    }
+    memset(ff_list, 0, sizeof(struct flow_file_list));
+}
diff --git a/nanofi/src/core/string_utils.c b/nanofi/src/core/string_utils.c
index 517da1f..6c6b8ea 100644
--- a/nanofi/src/core/string_utils.c
+++ b/nanofi/src/core/string_utils.c
@@ -20,36 +20,117 @@
 #include "core/string_utils.h"
 #include <string.h>
 #include <stdlib.h>
+#include <stdio.h>
 
-tokens tokenize_string(const char * str, char delim, tokenizer_mode_t mode) {
-    tokens tks;
-    tks.num_strings = 0;
-    tks.total_bytes = 0;
+int validate_list(struct token_list * tk_list) {
+    if (tk_list && tk_list->head && tk_list->tail && tk_list->size > 0) {
+        return 1;
+    }
+    return 0;
+}
 
-    if (!str) return tks;
+void add_token_to_list(struct token_list * tk_list, const char * begin, uint64_t len) {
+    struct token_node * new_node = (struct token_node *)malloc(sizeof(struct token_node));
+    new_node->data = (char *)malloc((len+1) * sizeof(char));
+    strncpy(new_node->data, begin, len);
+    new_node->data[len] = '\0';
+    new_node->next = NULL;
 
-    char * begin = (char *)str;
-    char * end = NULL;
-    int num_strings = 0;
-    while ((end = strchr(begin, delim))) {
-        if (begin == end) {
-            begin++;
-            continue;
+    if (!tk_list->head) {
+        tk_list->head = tk_list->tail = new_node;
+        tk_list->size++;
+        tk_list->total_bytes += len;
+        return;
+    }
+
+    tk_list->tail->next = new_node;
+    tk_list->tail = new_node;
+    tk_list->size++;
+    tk_list->total_bytes += len;
+}
+
+void free_token_node(struct token_node * node) {
+    if (node) {
+        free(node->data);
+    }
+    free(node);
+}
+
+void free_all_tokens(struct token_list * tks) {
+    while (tks && tks->head) {
+        struct token_node * node = tks->head;
+        tks->head = tks->head->next;
+        free_token_node(node);
+    }
+}
+
+void print_token_list(token_list * tokens) {
+    if (tokens) {
+        token_node * head = tokens->head;
+        int i = 0;
+        while (head) {
+            printf("Token %d : %s Length = %lu\n", i, head->data, strlen(head->data));
+            head = head->next;
+            ++i;
         }
-        begin = (end+1);
-        num_strings++;
+    }
+}
+
+void remove_last_node(token_list * tks) {
+    if (!validate_list(tks)) {
+        return;
     }
 
-    if (mode == DEFAULT_MODE && (*begin != '\0')) {
-        num_strings++;
+    if (tks->size == 1 || tks->head == tks->tail) {
+        tks->total_bytes -= strlen(tks->tail->data);
+        free_all_tokens(tks);
+        tks->head = NULL;
+        tks->tail = NULL;
+        tks->size = 0;
+        return;
     }
 
-    tks.str_list = calloc(num_strings, sizeof(char *));
-    tks.num_strings = 0;
-    tks.total_bytes = 0;
+    struct token_node * tmp_head = tks->head;
+    struct token_node * tmp_tail = tks->tail;
 
-    begin = (char *)str;
-    end = NULL;
+    while (tmp_head->next && (tmp_head->next != tmp_tail)) {
+        tmp_head = tmp_head->next;
+    }
+
+    struct token_node * tail_node = tmp_tail;
+    tks->tail = tmp_head;
+    tks->tail->next = NULL;
+
+    tks->size--;
+    tks->total_bytes -= (strlen(tail_node->data));
+    free_token_node(tail_node);
+}
+
+void attach_lists(token_list * to, token_list * from) {
+    if (to && validate_list(from)) {
+        if (!to->head) {
+            to->head = from->head;
+            to->tail = from->tail;
+            to->size += from->size;
+            return;
+        }
+
+        if (!to->tail) return;
+
+        to->tail->next = from->head;
+        to->tail = from->tail;
+        to->size += from->size;
+    }
+}
+
+token_list tokenize_string(const char * begin, char delim) {
+    token_list tks;
+    memset(&tks, 0, sizeof(struct token_list));
+
+    if (!begin) return tks;
+
+    const char * end = NULL;
+
     while ((end = strchr(begin, delim))) {
         if (begin == end) {
             begin++;
@@ -57,30 +138,27 @@
             continue;
         }
         int len = end - begin;
-        char * substr = (char *)malloc((len+1) * sizeof(char));
-        strncpy(substr, begin, len);
-        substr[len] = '\0';
-        tks.str_list[tks.num_strings++] = substr;
-        tks.total_bytes += (len+1);
+        add_token_to_list(&tks, begin, len);
+        tks.total_bytes++;
         begin = (end+1);
     }
 
-    if (mode == DEFAULT_MODE && (*begin != '\0')) {
+    if (begin && *begin != '\0') {
         int len = strlen(begin);
-        char * substr = (char *)malloc((len+1) * sizeof(char));
-        strncpy(substr, begin, len);
-        substr[len] = '\0';
-        tks.str_list[tks.num_strings++] = substr;
-        tks.total_bytes += (len+1);
+        if (len < MAX_BYTES_READ) {
+            tks.has_non_delimited_token = 1;
+        }
+        add_token_to_list(&tks, begin, len);
     }
+
     return tks;
 }
 
-void free_tokens(tokens * tks) {
-    if (tks) {
-        int i;
-        for (i = 0; i < tks->num_strings; ++i) {
-            free(tks->str_list[i]);
-        }
+token_list tokenize_string_tailfile(const char * str, char delim) {
+    token_list tks = tokenize_string(str, delim);
+    if (tks.has_non_delimited_token) {
+        remove_last_node(&tks);
     }
+    tks.has_non_delimited_token = 0;
+    return tks;
 }
diff --git a/nanofi/tests/CTailFileTests.cpp b/nanofi/tests/CTailFileTests.cpp
new file mode 100644
index 0000000..491e8c0
--- /dev/null
+++ b/nanofi/tests/CTailFileTests.cpp
@@ -0,0 +1,369 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "catch.hpp"
+
+#include <vector>
+#include <string>
+#include <fstream>
+#include <numeric>
+#include <algorithm>
+#include <assert.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/stat.h>
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+
+void test_lists_equal(token_list * tknlist, const std::vector<std::string>& sv) {
+    REQUIRE(tknlist != NULL);
+    if (sv.empty()) {
+        REQUIRE(tknlist->head == NULL);
+        REQUIRE(tknlist->size == 0);
+        return;
+    }
+    REQUIRE(tknlist->size == sv.size());
+    for (const auto& s : sv) {
+        if (tknlist->head) {
+            REQUIRE(strcmp(s.c_str(), tknlist->head->data) == 0);
+            tknlist->head = tknlist->head->next;
+        }
+    }
+}
+
+std::string join_strings(const std::vector<std::string>& strings, const std::string& token) {
+    if (strings.empty()) {
+        return "";
+    }
+
+    if (strings.size() == 1) {
+        return strings.at(0);
+    }
+    const auto& initialValue = strings.at(0);
+    return std::accumulate(strings.begin()+1, strings.end(), initialValue,
+                [token,initialValue](const std::string& s1, const std::string& s2) {
+                return s1 + token + s2;
+            });
+}
+
+/****
+ * ##################################################################
+ *  CTAILFILE STRING TOKENIZER TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test string tokenizer empty string", "[stringTokenizerEmptyString]") {
+    std::string delimitedString = "";
+    char delim = '-';
+    struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+    REQUIRE(tokens.size == 0);
+}
+
+TEST_CASE("Test string tokenizer normal delimited string", "[stringTokenizerDelimitedString]") {
+    std::vector<std::string> slist = {"this", "is a", "delimited", "string"};
+    std::string delimitedString = join_strings(slist, "-");
+    char delim = '-';
+    struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+    REQUIRE(tokens.size == 4);
+    REQUIRE(validate_list(&tokens) == 1);
+    test_lists_equal(&tokens, slist);
+}
+
+TEST_CASE("Test string tokenizer delimiter started string", "[stringTokenizerDelimiterStartedString]") {
+    std::vector<std::string> slist = {"", "this", "is", "a test"};
+    std::string delimitedString = join_strings(slist, "--");
+    char delim = '-';
+    struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+    REQUIRE(tokens.size == 3);
+    REQUIRE(validate_list(&tokens) == 1);
+    slist.erase(std::remove(slist.begin(), slist.end(), ""), slist.end());
+    test_lists_equal(&tokens, slist);
+}
+
+TEST_CASE("Test string tokenizer only delimiter character string", "[stringTokenizerDelimiterOnlyString]") {
+    std::string delimitedString = "--------";
+    char delim = '-';
+    struct token_list tokens = tokenize_string(delimitedString.c_str(), delim);
+    REQUIRE(tokens.size == 0);
+}
+
+/****
+ * ##################################################################
+ *  CTAILFILE TAIL FILE BEHAVED DELIMITED STRING TOKENIZER TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test string tokenizer for a delimited string less than 4096 bytes", "[testDelimitedStringTokenizer]") {
+
+    std::vector<std::string> slist = {"this", "is a", "delimited", "string", ""};
+    std::string delimitedString = join_strings(slist, "-");
+    int len = strlen(delimitedString.c_str());
+    char delim = '-';
+    struct token_list tokens = tokenize_string_tailfile(delimitedString.c_str(), delim);
+    REQUIRE(tokens.has_non_delimited_token == 0);
+    REQUIRE(tokens.size == 4);
+    REQUIRE(validate_list(&tokens) == 1);
+    slist.pop_back();
+    test_lists_equal(&tokens, slist);
+}
+
+TEST_CASE("Test string tokenizer for a non-delimited string less than 4096 bytes", "[testNonDelimitedStringTokenizer]") {
+    std::string nonDelimitedString = "this is a non delimited string";
+    char delim = '-';
+    struct token_list tokens = tokenize_string_tailfile(nonDelimitedString.c_str(), delim);
+    REQUIRE(tokens.has_non_delimited_token == 0);
+    REQUIRE(tokens.size == 0);
+    test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for empty string", "[testEmptyStringTokenizer]") {
+    const std::string emptyString = "";
+    char delim = '-';
+    struct token_list tokens = tokenize_string_tailfile(emptyString.c_str(), delim);
+    REQUIRE(tokens.has_non_delimited_token == 0);
+    REQUIRE(tokens.size == 0);
+    test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for string containing only delimited characters", "[testDelimiterCharOnlyStringTokenizer]") {
+    const std::string str = "----";
+    char delim = '-';
+    struct token_list tokens = tokenize_string_tailfile(str.c_str(), delim);
+    REQUIRE(tokens.has_non_delimited_token == 0);
+    REQUIRE(tokens.size == 0);
+    test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for string starting with delimited characters", "[testDelimitedStartingStringTokenizer]") {
+    const std::string str = "----mystring";
+    char delim = '-';
+    struct token_list tokens = tokenize_string_tailfile(str.c_str(), delim);
+    REQUIRE(tokens.has_non_delimited_token == 0);
+    REQUIRE(tokens.size == 0);
+    test_lists_equal(&tokens, {});
+}
+
+TEST_CASE("Test string tokenizer for string starting and ending with delimited characters", "[testDelimitedStartingEndingStringTokenizer]") {
+    const std::string str = "---token1---token2---token3";
+    char delim = '-';
+    struct token_list tokens = tokenize_string_tailfile(str.c_str(), delim);
+    REQUIRE(tokens.has_non_delimited_token == 0);
+    REQUIRE(tokens.size == 2);
+    test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"});
+}
+
+/****
+ * ##################################################################
+ *  CTAILFILE TAIL TESTS
+ * ##################################################################
+ */
+
+class FileManager {
+public:
+    FileManager(const std::string& filePath) {
+        assert(!filePath.empty() && "filePath provided cannot be empty!");
+        filePath_ = filePath;
+        outputStream_.open(filePath_, std::ios::binary);
+    }
+
+    ~FileManager() {
+        std::ifstream ifs(filePath_);
+        if (ifs.good()) {
+            remove(filePath_.c_str());
+        }
+    }
+
+    void Write(const std::string& str) {
+        outputStream_ << str;
+    }
+
+    std::string WriteNChars(uint64_t n, char c) {
+        std::string s(n, c);
+        outputStream_ << s;
+        return s;
+    }
+
+    std::string getFilePath() const {
+        return filePath_;
+    }
+
+    void CloseStream() {
+        outputStream_.flush();
+        outputStream_.close();
+    }
+
+    uint64_t GetFileSize() {
+        CloseStream();
+        struct stat buff;
+        if (stat(filePath_.c_str(), &buff) == 0) {
+            return buff.st_size;
+        }
+        return 0;
+    }
+
+private:
+    std::string filePath_;
+    std::ofstream outputStream_;
+};
+
+TEST_CASE("Simple tail file test", "[testTailFile]") {
+
+    FileManager fm("test.txt");
+    fm.Write("hello world");
+    fm.CloseStream();
+
+    const char * file = fm.getFilePath().c_str();
+    struct token_list tkn_list = tail_file(file, ';', 0);
+    REQUIRE(tkn_list.size == 0);
+    REQUIRE(tkn_list.head == NULL);
+    REQUIRE(tkn_list.total_bytes == 0);
+}
+
+TEST_CASE("Empty file tail test", "[testEmptyFileTail]") {
+    FileManager fm("test.txt");
+    fm.CloseStream();
+
+    const char * file = fm.getFilePath().c_str();
+    struct token_list tkn_list = tail_file(file, ';', 0);
+    REQUIRE(tkn_list.size == 0);
+    REQUIRE(tkn_list.head == NULL);
+    REQUIRE(tkn_list.total_bytes == 0);
+}
+
+TEST_CASE("File containing only delimiters tail test", "[testDelimiterOnlyFileTail]") {
+    FileManager fm("test.txt");
+    fm.Write("----");
+    fm.CloseStream();
+
+    const char * file = fm.getFilePath().c_str();
+    struct token_list tkn_list = tail_file(file, '-', 0);
+    REQUIRE(tkn_list.size == 0);
+    REQUIRE(tkn_list.head == NULL);
+    REQUIRE(tkn_list.total_bytes == 4);
+}
+
+TEST_CASE("File tail test string starting with delimiter", "[testDelimiterOnlyFileTail]") {
+    FileManager fm("test.txt");
+    fm.Write("----hello");
+    fm.CloseStream();
+
+    const char * file = fm.getFilePath().c_str();
+    struct token_list tkn_list = tail_file(file, '-', 0);
+    REQUIRE(tkn_list.size == 0);
+    REQUIRE(tkn_list.head == NULL);
+    REQUIRE(tkn_list.total_bytes == 4);
+}
+
+TEST_CASE("Test tail file with less than 4096 delimited chars", "[testTailFileDelimitedString]") {
+
+    const std::string delimitedString = "token1--token2--token3";
+    FileManager fm("test.txt");
+    fm.Write(delimitedString);
+    const std::string filePath = fm.getFilePath();
+    fm.CloseStream();
+
+    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+    test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"});
+}
+
+// Although there is no delimiter within the string that is at least 4096 bytes long,
+// tail_file still creates a flow file for the first 4096 bytes.
+TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testTailFile4096Chars]") {
+
+    FileManager fm("test.txt");
+    const std::string s = std::move(fm.WriteNChars(4096, 'a'));
+    const std::string filePath = fm.getFilePath();
+    fm.CloseStream();
+
+    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+    test_lists_equal(&tokens, std::vector<std::string>{std::move(s)});
+}
+
+// Although there is no delimiter within the string that is equal to 4096 bytes or longer
+// tail_file creates a flow file for each subsequent 4096 byte chunk. It leaves the last chunk
+// if it is smaller than 4096 bytes and not delimited
+TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testTailFileMoreThan4096Chars]") {
+
+    FileManager fm("test.txt");
+    const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+    const std::string s2 = std::move(fm.WriteNChars(4096, 'b'));
+    const std::string s3 = "helloworld";
+    fm.Write(s3);
+    fm.CloseStream();
+
+    const uint64_t totalStringsSize = s1.size() + s2.size() + s3.size();
+    const std::string filePath = fm.getFilePath();
+    const uint64_t bytesWrittenToStream = fm.GetFileSize();
+    REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+}
+
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testTailFileWithDelimitedString]") {
+
+    FileManager fm("test.txt");
+    const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string s2 = std::move(fm.WriteNChars(4096, 'b'));
+    fm.CloseStream();
+
+    const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
+    const std::string filePath = fm.getFilePath();
+    const uint64_t bytesWrittenToStream = fm.GetFileSize();
+    REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+}
+
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testTailFileWithDelimitedString]") {
+
+    FileManager fm("test.txt");
+    const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string s2 = std::move(fm.WriteNChars(4000, 'b'));
+    fm.CloseStream();
+
+    const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
+    const std::string filePath = fm.getFilePath();
+    const uint64_t bytesWrittenToStream = fm.GetFileSize();
+    REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1)});
+}
+
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testTailFileWithDelimitedString]") {
+
+    FileManager fm("test.txt");
+    const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
+    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string s2 = std::move(fm.WriteNChars(4098, 'b'));
+    fm.CloseStream();
+
+    const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
+    const std::string filePath = fm.getFilePath();
+    const uint64_t bytesWrittenToStream = fm.GetFileSize();
+    REQUIRE(bytesWrittenToStream == totalStringsSize);
+
+    const std::string s3 = std::string(s2.data(), s2.data()+4096);
+    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
+    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s3)});
+}