MINIFICPP-927 Add delimited tailfile processor

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #613
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 348725e..c3bec9f 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -46,7 +46,7 @@
    */
   virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
 
-  virtual std::string getStoragePath() {
+  virtual std::string getStoragePath() const {
     return directory_;
   }
 
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 424e306..be8ed91 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -59,7 +59,9 @@
         flow_repo_(flow_repo),
         content_repo_(content_repo),
         processor_node_(processor),
-        logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+        logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
+        configure_(std::make_shared<minifi::Configure>()),
+        initialized_(false) {
     repo_ = repo;
   }
 
@@ -75,7 +77,8 @@
         flow_repo_(flow_repo),
         content_repo_(content_repo),
         processor_node_(processor),
-        logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+        logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
+        initialized_(false) {
     repo_ = repo;
   }
   // Destructor
@@ -197,6 +200,15 @@
     return controller_service_provider_->getControllerServiceName(identifier);
   }
 
+  void initializeContentRepository(const std::string& home) {
+      configure_->setHome(home);
+      content_repo_->initialize(configure_);
+      initialized_ = true;
+  }
+
+  bool isInitialized() const {
+      return initialized_;
+  }
  private:
 
   template<typename T>
@@ -217,7 +229,9 @@
 
   // Logger
   std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<Configure> configure_;
 
+  bool initialized_;
 };
 
 } /* namespace core */
diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h
index 65e0414..a2a2e78 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -41,7 +41,7 @@
 
   }
 
-  virtual std::string getStoragePath() = 0;
+  virtual std::string getStoragePath() const = 0;
 
   /**
    * Create a write stream using the streamId as a reference.
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 91fc130..bedfee8 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -282,7 +282,15 @@
   }
 #endif
 
-  static int create_dir(const std::string &path, bool create = true) {
+  static int is_directory(const char * path) {
+      struct stat dir_stat;
+      if (stat(path, &dir_stat) < 0) {
+          return 0;
+      }
+      return S_ISDIR(dir_stat.st_mode);
+  }
+
+  static int create_dir(const std::string& path, bool recursive = true) {
 #ifdef BOOST_VERSION
     boost::filesystem::path dir(path);
     if(boost::filesystem::create_directory(dir))
@@ -301,12 +309,41 @@
       return 0;
     }
 #else
-    struct stat dir_stat;
-    if (stat(path.c_str(), &dir_stat)) {
-      if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) {
+    if (!recursive) {
+        if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) {
+            return -1;
+        }
+        return 0;
+    }
+
+    int ret = mkdir(path.c_str(), 0700);
+    if (ret == 0) {
+        return 0;
+    }
+
+    switch (errno) {
+    case ENOENT: {
+        size_t found = path.find_last_of(get_separator(0));
+
+        if (found == std::string::npos) {
+            return -1;
+        }
+
+        const std::string dir = path.substr(0, found);
+        int res = create_dir(dir);
+        if (res < 0) {
+            return -1;
+        }
+        return mkdir(path.c_str(), 0700);
+    }
+    case EEXIST: {
+        if (is_directory(path.c_str())) {
+            return 0;
+        }
         return -1;
-      }
-      return 0;
+    }
+    default:
+        return -1;
     }
 #endif
     return -1;
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index 4607d74..4f9b83d 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -34,7 +34,7 @@
   if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) {
     directory_ = value;
   } else {
-    directory_ = configuration->getHome() + "/contentrepository";
+    directory_ = configuration->getHome();
   }
   utils::file::FileUtils::create_dir(directory_);
   return true;
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index 9e4e1e8..7110519 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -33,9 +33,11 @@
 include_directories(../libminifi/opsys/posix)
 endif()
 
-file(GLOB NANOFI_SOURCES  "src/api/*.cpp" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*")
+file(GLOB NANOFI_SOURCES "src/api/*.c*" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*")
 
-file(GLOB NANOFI_EXAMPLES_SOURCES  "examples/*.c" )
+if(WIN32)
+list(REMOVE_ITEM NANOFI_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/api/ecu.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/file_utils.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/flowfiles.c)
+endif()
 
 file(GLOB NANOFI_ECU_SOURCES "ecu/*.c")
 
diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt
index b28af76..fccb443 100644
--- a/nanofi/ecu/CMakeLists.txt
+++ b/nanofi/ecu/CMakeLists.txt
@@ -19,33 +19,6 @@
 
 cmake_minimum_required(VERSION 2.6)
 
-IF(POLICY CMP0048)
-  CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
-
-include_directories(/include)
-
-include(CheckCXXCompilerFlag)
-if (WIN32)
-  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
-	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
-	    if (_cpp_latest_flag_supported)
-	        add_compile_options("/std:c++14")
-	    endif()
-	endif()
-else()
-CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
-CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
-if(COMPILER_SUPPORTS_CXX11)
-    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
-elseif(COMPILER_SUPPORTS_CXX0X)
-    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
-else()
- message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
-endif()
-
-endif()
-
 if (APPLE)
     set(LINK_FLAGS "-Wl,-all_load")
     set(LINK_END_FLAGS "")
@@ -56,8 +29,16 @@
 
 if (NOT WIN32)
 
-add_executable(tail_file tail_file.c)
+add_executable(log_aggregator log_aggregator.c)
 
-target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+target_link_libraries(log_aggregator nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(tailfile_chunk tailfile_chunk.c)
+
+target_link_libraries(tailfile_chunk nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(tailfile_delimited tailfile_delimited.c)
+
+target_link_libraries(tailfile_delimited nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
 
 endif()
\ No newline at end of file
diff --git a/nanofi/ecu/log_aggregator.c b/nanofi/ecu/log_aggregator.c
new file mode 100644
index 0000000..52d4f23
--- /dev/null
+++ b/nanofi/ecu/log_aggregator.c
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+int main(int argc, char** argv) {
+
+    if (argc < 7) {
+        printf("Error: must run ./log_aggregator <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n");
+        exit(1);
+    }
+
+    tailfile_input_params input_params = init_logaggregate_input(argv);
+
+    uint64_t intrvl = 0;
+    uint64_t port_num = 0;
+    if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+        return 1;
+    }
+
+    setup_signal_action();
+    nifi_proc_params params = setup_nifi_processor(&input_params, "LogAggregator", on_trigger_logaggregator);
+
+    set_standalone_property(params.processor, "file_path", input_params.file);
+    set_standalone_property(params.processor, "delimiter", input_params.delimiter);
+
+    struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(params.processor, uuid_str);
+
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(params.processor);
+        struct processor_params * pp = NULL;
+        HASH_FIND_STR(procparams, uuid_str, pp);
+        if (pp) {
+            transmit_payload(client, pp->ff_list, 0);
+            delete_all_flow_files_from_proc(uuid_str);
+        }
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    printf("log aggregator processor stopped\n");
+    if (client) {
+        destroyClient(client);
+    }
+    clear_content_repo(params.instance);
+    delete_all_flow_files_from_proc(uuid_str);
+    free_standalone_processor(params.processor);
+    free_instance(params.instance);
+    free_proc_params(uuid_str);
+    return 0;
+}
diff --git a/nanofi/ecu/tail_file.c b/nanofi/ecu/tail_file.c
deleted file mode 100644
index c428be1..0000000
--- a/nanofi/ecu/tail_file.c
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
-    *
-    *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-#include "api/nanofi.h"
-#include "core/string_utils.h"
-#include "core/cstructs.h"
-#include "core/file_utils.h"
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <limits.h>
-#include <signal.h>
-#include <sys/stat.h>
-
-struct flow_file_records * flowfiles = NULL;
-nifi_instance * instance = NULL;
-standalone_processor * proc = NULL;
-int file_offset = 0;
-int stopped = 0;
-flow_file_list ff_list;
-token_list tks;
-
-void signal_handler(int signum) {
-    if (signum == SIGINT || signum == SIGTERM) {
-        stopped = 1;
-    }
-}
-
-void transmit_flow_files(nifi_instance * instance) {
-    flow_file_list_node * head = ff_list.head;
-    while (head) {
-        transmit_flowfile(head->ff_record, instance);
-        head = head->next;
-    }
-}
-
-void set_offset(int offset) {
-    file_offset = offset;
-}
-
-int get_offset() {
-    return file_offset;
-}
-
-void on_trigger_callback(processor_session * ps, processor_context * ctx) {
-
-    char file_path[4096];
-    char delimiter[3];
-
-    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
-        return;
-    }
-
-    if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
-        return;
-    }
-
-    if (strlen(delimiter) == 0) {
-        printf("Delimiter not specified or it is empty\n");
-        return;
-    }
-    char delim = delimiter[0];
-
-    if (delim == '\\') {
-          if (strlen(delimiter) > 1) {
-            switch (delimiter[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                break;
-            }
-        }
-    }
-
-    tks = tail_file(file_path, delim, get_offset());
-
-    if (!validate_list(&tks)) return;
-
-    set_offset(get_offset() + tks.total_bytes);
-
-    token_node * head;
-    for (head = tks.head; head && head->data; head = head->next) {
-        flow_file_record * ffr = generate_flow_file(instance, proc);
-        const char * flow_file_path = ffr->contentLocation;
-        FILE * ffp = fopen(flow_file_path, "wb");
-        if (!ffp) {
-            printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
-            break;
-        }
-
-        int count = strlen(head->data);
-        int ret = fwrite(head->data, 1, count, ffp);
-        if (ret < count) {
-            fclose(ffp);
-            break;
-        }
-        fseek(ffp, 0, SEEK_END);
-        ffr->size = ftell(ffp);
-        fclose(ffp);
-        add_flow_file_record(&ff_list, ffr);
-    }
-    free_all_tokens(&tks);
-}
-
-int main(int argc, char** argv) {
-
-    if (argc < 6) {
-        printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
-        exit(1);
-    }
-
-    char * file = argv[1];
-    char * interval = argv[2];
-    char * delimiter = argv[3];
-    char * instance_str = argv[4];
-    char * port_str = argv[5];
-
-    if (access(file, F_OK) == -1) {
-        printf("Error: %s doesn't exist!\n", file);
-        exit(1);
-    }
-
-    struct stat stats;
-    errno = 0;
-    int ret = stat(file, &stats);
-
-    if (ret == -1) {
-        printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
-        exit(1);
-    }
-    // Check for file existence
-    if (S_ISDIR(stats.st_mode)){
-        printf("Error: %s is a directory!\n", file);
-        exit(1);
-    }
-
-    errno = 0;
-    unsigned long intrvl = strtol(interval, NULL, 10);
-
-    if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
-        printf("Invalid interval value specified\n");
-        return 0;
-    }
-
-    struct sigaction action;
-    memset(&action, 0, sizeof(sigaction));
-    action.sa_handler = signal_handler;
-    sigaction(SIGTERM, &action, NULL);
-    sigaction(SIGINT, &action, NULL);
-
-    nifi_port port;
-
-    port.port_id = port_str;
-
-    instance = create_instance(instance_str, &port);
-
-    const char * processor_name = "TailFile";
-
-    add_custom_processor(processor_name, on_trigger_callback);
-
-    proc = create_processor(processor_name);
-
-    set_standalone_property(proc, "file_path", file);
-    set_standalone_property(proc, "delimiter", delimiter);
-
-    set_offset(0);
-    while (!stopped) {
-        flow_file_record * new_ff = invoke(proc);
-        transmit_flow_files(instance);
-        free_flow_file_list(&ff_list);
-        free_flowfile(new_ff);
-        sleep(intrvl);
-    }
-
-    printf("tail file processor stopped\n");
-    free_standalone_processor(proc);
-    free(instance);
-
-    return 0;
-}
diff --git a/nanofi/ecu/tailfile_chunk.c b/nanofi/ecu/tailfile_chunk.c
new file mode 100644
index 0000000..26b9966
--- /dev/null
+++ b/nanofi/ecu/tailfile_chunk.c
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include <unistd.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+int main(int argc, char** argv) {
+
+    if (argc < 7) {
+        printf("Error: must run ./tailfile_chunk <file> <interval> <chunksize> <hostname> <tcp port number> <nifi port uuid>\n");
+        exit(1);
+    }
+
+    tailfile_input_params input_params = init_tailfile_chunk_input(argv);
+
+    uint64_t intrvl = 0;
+    uint64_t port_num = 0;
+    if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+        return 1;
+    }
+
+    setup_signal_action();
+    nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileChunk", on_trigger_tailfilechunk);
+
+    set_standalone_property(params.processor, "file_path", input_params.file);
+    set_standalone_property(params.processor, "chunk_size", input_params.chunk_size);
+
+    struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(params.processor, uuid_str);
+
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(params.processor);
+        struct processor_params * pp = NULL;
+        HASH_FIND_STR(procparams, uuid_str, pp);
+        if (pp) {
+            transmit_payload(client, pp->ff_list, 0);
+            delete_all_flow_files_from_proc(uuid_str);
+        }
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    printf("processor stopped\n");
+    if (client) {
+        destroyClient(client);
+    }
+    clear_content_repo(params.instance);
+    delete_all_flow_files_from_proc(uuid_str);
+    free_standalone_processor(params.processor);
+    free_instance(params.instance);
+    free_proc_params(uuid_str);
+    return 0;
+}
diff --git a/nanofi/ecu/tailfile_delimited.c b/nanofi/ecu/tailfile_delimited.c
new file mode 100644
index 0000000..3495231
--- /dev/null
+++ b/nanofi/ecu/tailfile_delimited.c
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include <unistd.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+int main(int argc, char** argv) {
+
+    if (argc < 7) {
+        printf("Error: must run ./tailfile_delimited <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n");
+        exit(1);
+    }
+
+    tailfile_input_params input_params = init_logaggregate_input(argv);
+
+    uint64_t intrvl = 0;
+    uint64_t port_num = 0;
+    if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+        return 1;
+    }
+
+    setup_signal_action();
+    nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileDelimited", on_trigger_tailfiledelimited);
+
+    set_standalone_property(params.processor, "file_path", input_params.file);
+    set_standalone_property(params.processor, "delimiter", input_params.delimiter);
+
+    struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(params.processor, uuid_str);
+
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(params.processor);
+        struct processor_params * pp = NULL;
+        HASH_FIND_STR(procparams, uuid_str, pp);
+        if (pp) {
+            transmit_payload(client, pp->ff_list, 1);
+            delete_completed_flow_files_from_proc(uuid_str);
+        }
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    printf("tailfile delimited processor stopped\n");
+    if (client) {
+        destroyClient(client);
+    }
+    clear_content_repo(params.instance);
+    delete_all_flow_files_from_proc(uuid_str);
+    free_standalone_processor(params.processor);
+    free_instance(params.instance);
+    free_proc_params(uuid_str);
+    return 0;
+}
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index b9480ed..6a9779c 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -83,8 +83,4 @@
 
 target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
 
-#add_executable(tail_file tail_file.c)
-
-#target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
-
 endif()
diff --git a/nanofi/examples/tail_file.c b/nanofi/examples/tail_file.c
deleted file mode 100644
index 2200df4..0000000
--- a/nanofi/examples/tail_file.c
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
-    *
-    *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-#include "api/nanofi.h"
-#include "core/string_utils.h"
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <limits.h>
-#include <signal.h>
-#include <sys/stat.h>
-
-typedef struct flow_file_records {
-    flow_file_record ** records;
-    uint64_t len;
-} flow_file_records;
-
-struct flow_file_records * flowfiles = NULL;
-nifi_instance * instance = NULL;
-standalone_processor * proc = NULL;
-int file_offset = 0;
-int stopped = 0;
-
-void signal_handler(int signum) {
-    if (signum == SIGINT || signum == SIGTERM) {
-        stopped = 1;
-    }
-}
-
-void transmit_flow_files(nifi_instance * instance) {
-    NULL_CHECK( ,flowfiles);
-    int i;
-    for (i = 0; i < flowfiles->len; ++i) {
-        NULL_CHECK( ,flowfiles->records[i]);
-        transmit_flowfile(flowfiles->records[i], instance);
-    }
-}
-
-void free_flow_file_records() {
-    NULL_CHECK( ,flowfiles);
-    int i;
-    for (i = 0; i < flowfiles->len; ++i) {
-        free_flowfile(flowfiles->records[i]);
-    }
-    free(flowfiles);
-    flowfiles = NULL;
-}
-
-void set_offset(int offset) {
-    file_offset = offset;
-}
-
-int get_offset() {
-    return file_offset;
-}
-
-void free_all_strings(char ** strings, int num_strings) {
-    int i;
-    for (i = 0; i < num_strings; ++i) {
-        free(strings[i]);
-    }
-}
-
-void on_trigger_callback(processor_session * ps, processor_context * ctx) {
-
-    char file_path[4096];
-    char delimiter[2];
-
-    if (get_property(ctx, "file_path", file_path, 50) != 0) {
-        return;
-    }
-
-    if (get_property(ctx, "delimiter", delimiter, 2) != 0) {
-        return;
-    }
-
-    if (strlen(delimiter) == 0) {
-        printf("Delimiter not specified or it is empty\n");
-        return;
-    }
-    char delim = '\0';
-    if (strlen(delimiter) > 0) {
-        delim = delimiter[0];
-    }
-
-    if (delim == '\0') {
-        printf("Invalid delimiter \n");
-        return;
-    }
-
-    if (delim == '\\') {
-          if (strlen(delimiter) > 1) {
-            switch (delimiter[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                break;
-            }
-        }
-    }
-
-    int curr_offset = get_offset();
-    int max_bytes_read = 4096;
-    char buff[max_bytes_read + 1];
-    memset(buff,'\0', max_bytes_read);
-    FILE * fp = fopen(file_path, "rb");
-    if (!fp) return;
-    fseek(fp, curr_offset, SEEK_SET);
-
-    int bytes_read = 0;
-    while ((bytes_read = fread(buff, 1, max_bytes_read, fp)) > 0) {
-        buff[bytes_read] = '\0';
-        tokenizer_mode_t mode = TAILFILE_MODE;
-        struct tokens tks = tokenize_string(buff, delim, mode);
-
-        if (tks.num_strings == 0) return;
-
-        set_offset(get_offset() + tks.total_bytes);
-
-        flowfiles = (flow_file_records *)malloc(sizeof(flow_file_records));
-        flowfiles->records = malloc(sizeof(flow_file_record *) * tks.num_strings);
-        flowfiles->len = tks.num_strings;
-
-        int i;
-        for (i = 0; i < tks.num_strings; ++i) {
-            flowfiles->records[i] = NULL;
-        }
-
-        for (i = 0; i < tks.num_strings; ++i) {
-            if (tks.str_list[i] && strlen(tks.str_list[i]) > 0) {
-                flow_file_record * ffr = generate_flow_file(instance, proc);
-                const char * flow_file_path = ffr->contentLocation;
-                FILE * ffp = fopen(flow_file_path, "wb");
-                if (!ffp) {
-                    printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
-                    fclose(fp);
-                    free_tokens(&tks);
-                    return;
-                }
-                int count = strlen(tks.str_list[i]);
-                int ret = fwrite(tks.str_list[i], 1, count, ffp);
-                if (ret < count) {
-                    fclose(ffp);
-                    return;
-                }
-                fseek(ffp, 0, SEEK_END);
-                ffr->size = ftell(ffp);
-                fclose(ffp);
-                flowfiles->records[i] = ffr;
-            }
-        }
-        free_tokens(&tks);
-    }
-    fclose(fp);
-}
-
-int main(int argc, char** argv) {
-
-    if (argc < 6) {
-        printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
-        exit(1);
-    }
-
-    char * file = argv[1];
-    char * interval = argv[2];
-    char * delimiter = argv[3];
-    char * instance_str = argv[4];
-    char * port_str = argv[5];
-
-    if (access(file, F_OK) == -1) {
-        printf("Error: %s doesn't exist!\n", file);
-        exit(1);
-    }
-
-    struct stat stats;
-    int ret = stat(file, &stats);
-
-    errno = 0;
-    if (ret == -1) {
-        printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
-        exit(1);
-    }
-    // Check for file existence
-    if (S_ISDIR(stats.st_mode)){
-        printf("Error: %s is a directory!\n", file);
-        exit(1);
-    }
-
-    errno = 0;
-    unsigned long intrvl = strtol(interval, NULL, 10);
-
-    if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
-        printf("Invalid interval value specified\n");
-        return 0;
-    }
-
-    struct sigaction action;
-    memset(&action, 0, sizeof(sigaction));
-    action.sa_handler = signal_handler;
-    sigaction(SIGTERM, &action, NULL);
-    sigaction(SIGINT, &action, NULL);
-
-    nifi_port port;
-
-    port.port_id = port_str;
-
-    instance = create_instance(instance_str, &port);
-
-    const char * processor_name = "TailFile";
-
-    add_custom_processor(processor_name, on_trigger_callback);
-
-    proc = create_processor(processor_name);
-
-    set_standalone_property(proc, "file_path", file);
-    set_standalone_property(proc, "delimiter", delimiter);
-
-    set_offset(0);
-    while (!stopped) {
-        flow_file_record * new_ff = invoke(proc);
-        transmit_flow_files(instance);
-        free_flow_file_records();
-        free_flowfile(new_ff);
-        sleep(intrvl);
-    }
-
-    free_standalone_processor(proc);
-    free(instance);
-
-    return 0;
-}
diff --git a/nanofi/include/api/ecu.h b/nanofi/include/api/ecu.h
new file mode 100644
index 0000000..9549491
--- /dev/null
+++ b/nanofi/include/api/ecu.h
@@ -0,0 +1,95 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifndef NANOFI_INCLUDE_API_ECU_H_
+#define NANOFI_INCLUDE_API_ECU_H_
+
+#include <signal.h>
+#include "api/nanofi.h"
+#include "uthash.h"
+#include "utlist.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct proc_properties {
+    char * file_path;
+    char delimiter;
+    uint64_t chunk_size;
+} proc_properties;
+
+typedef struct processor_params {
+    char uuid_str[37]; //key
+    struct flow_file_list * ff_list;
+    uint64_t curr_offset;
+    struct proc_properties * properties;
+    UT_hash_handle hh;
+} processor_params;
+
+extern processor_params * procparams;
+extern volatile sig_atomic_t stopped;
+
+typedef struct tailfile_input_params {
+    char * file;
+    char * interval;
+    char * delimiter;
+    char * instance;
+    char * tcp_port;
+    char * nifi_port_uuid;
+    char * chunk_size;
+} tailfile_input_params;
+
+typedef struct nifi_proc_params {
+    nifi_instance * instance;
+    standalone_processor * processor;
+} nifi_proc_params;
+
+/**
+ * Tails a delimited file starting from an offset up to the end of file
+ * @param file the path to the file to tail
+ * @param delim the delimiter character
+ * @param ctx the process context
+ * For eg. To tail from beginning of the file curr_offset = 0
+ * @return a list of flow file info containing list of flow file records
+ * and the current offset in the file
+ */
+flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx);
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx);
+void on_trigger_logaggregator(processor_session * ps, processor_context * ctx);
+void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx);
+void signal_handler(int signum);
+void delete_all_flow_files_from_proc(const char * uuid);
+void delete_completed_flow_files_from_proc(const char * uuid);
+void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ff);
+processor_params * get_proc_params(const char * uuid);
+
+void init_common_input(tailfile_input_params * input_params, char ** args);
+tailfile_input_params init_logaggregate_input(char ** args);
+tailfile_input_params init_tailfile_chunk_input(char ** args);
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num);
+void setup_signal_action();
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *));
+void free_proc_params(const char * uuid);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* NANOFI_INCLUDE_API_ECU_H_ */
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 074ae70..f401073 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -150,7 +150,7 @@
  * @param name the name of the processor to instanciate
  * @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?)
  **/
-standalone_processor *create_processor(const char * name);
+standalone_processor *create_processor(const char * name, nifi_instance * instance);
 
 /**
  * Free a standalone processor
@@ -318,6 +318,13 @@
 flow_file_record* generate_flow_file(nifi_instance * instance, standalone_processor * proc);
 
 /**
+ * Adds content to the flow file record.
+ * @param ctx the processor context
+ * @return a flow file record
+ */
+flow_file_record * generate_flow(processor_context * ctx);
+
+/**
  * Get incoming flow file. To be used in processor logic callbacks.
  * @param session current processor session
  * @param context current processor context
@@ -441,6 +448,35 @@
  **/
 int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship);
 
+/**
+ * Write content to a flow file and return a pointer to flow file record
+ * @param buff, the buffer to read content from
+ * @param count the number of bytes to read
+ * @param ctx the processor context
+ */
+flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx);
+
+/**
+ * Initialize content repository
+ * @param ctx the processor context
+ */
+void initialize_content_repo(processor_context * ctx, const char * uuid);
+
+/**
+ * Clear content repository contents
+ */
+void clear_content_repo(const nifi_instance * instance);
+
+/**
+ * Get the processor uuid from processor context
+ */
+void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target);
+
+/**
+ * Get the processor uuid from processor
+ */
+void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target);
+
 /****
  * ##################################################################
  *  Persistence Operations
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index 3be3ce3..846be5b 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -117,9 +117,9 @@
 
   char * contentLocation; /**< Filesystem location of this object */
 
-  void *attributes; /**< Hash map of attributes */
+  void * attributes; /**< Hash map of attributes */
 
-  void *ffp;
+  void * ffp;
 
   uint8_t keepContent;
 
@@ -175,16 +175,15 @@
  * ##################################################################
  */
 
-typedef struct flow_file_list_node {
-    flow_file_record * ff_record;
-    struct flow_file_list_node * next;
-} flow_file_list_node;
-
 typedef struct flow_file_list {
-    flow_file_list_node * head;
-    flow_file_list_node * tail;
-    int len;
-    int offset;
+    flow_file_record * ff_record;
+    int complete;
+    struct flow_file_list * next;
 } flow_file_list;
 
+typedef struct flow_file_info {
+    struct flow_file_list * ff_list;
+    uint64_t total_bytes;
+} flow_file_info;
+
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/file_utils.h b/nanofi/include/core/file_utils.h
index 25c3029..15806e4 100644
--- a/nanofi/include/core/file_utils.h
+++ b/nanofi/include/core/file_utils.h
@@ -19,6 +19,7 @@
 #ifndef NANOFI_INCLUDE_CORE_FILE_UTILS_H_
 #define NANOFI_INCLUDE_CORE_FILE_UTILS_H_
 
+#include "utlist.h"
 #include "flowfiles.h"
 
 #ifdef __cplusplus
@@ -26,14 +27,49 @@
 #endif
 
 /**
- * Tails a delimited file starting from an offset up to the end of file
- * @param file the path to the file to tail
- * @param delim the delimiter character
- * @param curr_offset the offset in the file to tail from.
- * For eg. To tail from beginning of the file curr_offset = 0
- * @return a list of tokens
+ * Recursively deletes a directory tree
+ * @param path, the path to the directory
  */
-token_list tail_file(const char * file, char delim, int curr_offset);
+void remove_directory(const char * path);
+
+/**
+ * Determine if the provided directory/file path is a directory
+ * @path the absolute path to the file/directory
+ * @return 1 if path is directory else 0
+ */
+int is_directory(const char * path);
+
+/*
+ * Get the platform-specific path separator.
+ * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+ * @return the path separator character
+ */
+const char * get_separator(int force_posix);
+
+/**
+ * Joins parent path with child path
+ * @param parent the parent path
+ * @param child the child path
+ * @return concatenated path
+ * @attention this function allocates memory for the returned concatenated path
+ * and it is left for the caller to free the memory
+ */
+char * concat_path(const char * parent, const char * child);
+
+/**
+ * Make a directory tree specified by path
+ * @param path the path to the directory
+ * @return 1 if successful else 0
+ */
+int make_dir(const char * path);
+
+/**
+ * Return the current working directory
+ * @return the current working directory
+ * @attention this function allocates memory on heap
+ * it is left to the caller to free it
+ */
+char * get_current_working_directory();
 
 #ifdef __cplusplus
 }
diff --git a/nanofi/include/core/flowfiles.h b/nanofi/include/core/flowfiles.h
index 60ac02e..72307ec 100644
--- a/nanofi/include/core/flowfiles.h
+++ b/nanofi/include/core/flowfiles.h
@@ -19,10 +19,33 @@
 #ifndef NANOFI_INCLUDE_CORE_FLOWFILES_H_
 #define NANOFI_INCLUDE_CORE_FLOWFILES_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include "cstructs.h"
+#include "api/ecu.h"
+#include "sitetosite/CPeer.h"
+#include "sitetosite/CRawSocketProtocol.h"
 
-void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record);
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record);
 
-void free_flow_file_list(flow_file_list * ff_list);
+void free_flow_file_list(flow_file_list ** ff_list);
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset);
+
+void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset);
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete);
+
+void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete);
+
+uint64_t flow_files_size(flow_file_list * ff_list);
+
+void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client);
+
+#ifdef __cplusplus
+}
+#endif
 
 #endif /* NANOFI_INCLUDE_CORE_FLOWFILES_H_ */
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index b10b95f..0325081 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -27,6 +27,7 @@
 #include "RemoteProcessorGroupPort.h"
 #include "core/ContentRepository.h"
 #include "core/repository/VolatileContentRepository.h"
+#include "core/repository/FileSystemRepository.h"
 #include "core/Repository.h"
 
 #include "C2CallbackAgent.h"
@@ -40,6 +41,8 @@
 #include "ReflexiveSession.h"
 #include "utils/ThreadPool.h"
 #include "core/state/UpdateController.h"
+#include "core/file_utils.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -63,14 +66,24 @@
 class Instance {
  public:
 
-  explicit Instance(const std::string &url, const std::string &port)
+  explicit Instance(const std::string &url, const std::string &port, const std::string &repo_class_name = "")
       : configure_(std::make_shared<Configure>()),
         url_(url),
         agent_(nullptr),
         rpgInitialized_(false),
         listener_thread_pool_(1),
-        content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
         no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+
+    if (repo_class_name == "filesystemrepository") {
+        content_repo_ = std::make_shared<minifi::core::repository::FileSystemRepository>();
+    } else {
+        content_repo_ = std::make_shared<minifi::core::repository::VolatileContentRepository>();
+    }
+    char * cwd = get_current_working_directory();
+    if (cwd) {
+        configure_->setHome(std::string(cwd));
+        free(cwd);
+    }
     running_ = false;
     stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
     utils::Identifier uuid;
@@ -118,7 +131,7 @@
     return no_op_repo_;
   }
 
-  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() const {
     return content_repo_;
   }
 
diff --git a/nanofi/src/api/ecu.c b/nanofi/src/api/ecu.c
new file mode 100644
index 0000000..709c681
--- /dev/null
+++ b/nanofi/src/api/ecu.c
@@ -0,0 +1,530 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(const char * uuid) {
+
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        free_flow_file_list(&pp->ff_list);
+        free(pp->properties->file_path);
+        free(pp->properties);
+        HASH_DEL(procparams, pp);
+        free(pp);
+    }
+}
+
+void signal_handler(int signum) {
+    if (signum == SIGINT || signum == SIGTERM) {
+        stopped = 1;
+    }
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+    if (args && *args) {
+        input_params->file = args[1];
+        input_params->interval = args[2];
+        input_params->instance = args[4];
+        input_params->tcp_port = args[5];
+        input_params->nifi_port_uuid = args[6];
+    }
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.delimiter = args[3];
+    return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.chunk_size = args[3];
+    return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num) {
+    if (access(params->file, F_OK) == -1) {
+        printf("Error: %s doesn't exist!\n", params->file);
+        return -1;
+    }
+
+    struct stat stats;
+    int ret = stat(params->file, &stats);
+
+    if (ret == -1) {
+        printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file, strerror(errno));
+        return -1;
+    }
+    // Check for file existence
+    if (S_ISDIR(stats.st_mode)){
+        printf("Error: %s is a directory!\n", params->file);
+        return -1;
+    }
+
+    errno = 0;
+    *intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+    if (errno != 0) {
+        printf("Invalid interval value specified\n");
+        return -1;
+    }
+
+    errno = 0;
+    *port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+    if (errno != 0) {
+        printf("Cannot convert tcp port to numeric value\n");
+        return -1;
+    }
+    return 0;
+}
+
+void setup_signal_action() {
+    struct sigaction action;
+    memset(&action, 0, sizeof(sigaction));
+    action.sa_handler = signal_handler;
+    sigaction(SIGTERM, &action, NULL);
+    sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *)) {
+    nifi_proc_params params;
+    nifi_port port;
+    port.port_id = input_params->nifi_port_uuid;
+
+    nifi_instance * instance = create_instance(input_params->instance, &port);
+    add_custom_processor(processor_name, callback);
+    standalone_processor * proc = create_processor(processor_name, instance);
+    params.instance = instance;
+    params.processor = proc;
+    return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp == NULL) {
+        pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+    }
+
+    add_flow_file_record(&pp->ff_list, ffr);
+    pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            free_flowfile(tmp->ff_record);
+            head = head->next;
+            free(tmp);
+        }
+        pp->ff_list = head;
+    }
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            if (tmp->complete) {
+                free_flowfile(tmp->ff_record);
+                head = head->next;
+                free(tmp);
+            }
+            else {
+                break;
+            }
+        }
+        pp->ff_list = head;
+    }
+}
+
+uint64_t get_current_offset(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        return pp->curr_offset;
+    }
+    return 0;
+}
+
+processor_params * get_proc_params(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    return pp;
+}
+
+void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ffl) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (!pp) {
+        pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        pp->ff_list = ffl;
+        pp->curr_offset = value;
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+        return;
+    }
+    delete_all_flow_files_from_proc(uuid);
+    pp->curr_offset += value;
+    pp->ff_list = ffl;
+}
+
+uint64_t update_curr_offset(const char * uuid, uint64_t value) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (pp) {
+        pp->curr_offset += value;
+        return pp->curr_offset;
+    }
+
+    pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+    memset(pp, 0, sizeof(struct processor_params));
+    strcpy(pp->uuid_str, uuid);
+    pp->curr_offset = value;
+    HASH_ADD_STR(procparams, uuid_str, pp);
+    return pp->curr_offset;
+}
+
+struct proc_properties * get_processor_properties(const char * uuid) {
+    if (!uuid) {
+        return NULL;
+    }
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (!pp) {
+        return NULL;
+    }
+    return pp->properties;
+}
+
+void add_processor_properties(const char * uuid, struct proc_properties * const props) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (pp) {
+        pp->properties = props;
+        return;
+    }
+
+    pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+    memset(pp, 0, sizeof(struct processor_params));
+    strcpy(pp->uuid_str, uuid);
+    pp->properties = props;
+    HASH_ADD_STR(procparams, uuid_str, pp);
+}
+
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx) {
+
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    initialize_content_repo(ctx, uuid_str);
+
+    struct proc_properties * props = get_processor_properties(uuid_str);
+    if (!props) {
+        char file_path[4096];
+        char chunk_size[50];
+        if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+            return;
+        }
+
+        if (get_property(ctx, "chunk_size", chunk_size, sizeof(chunk_size)) != 0) {
+            return;
+        }
+
+        errno = 0;
+        uint64_t chunk_size_value = strtoul(chunk_size, NULL, 10);
+
+        if (errno != 0 || chunk_size_value == 0) {
+            printf("Invalid chunk size specified\n");
+            return;
+        }
+
+        props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
+        memset(props, 0, sizeof(struct proc_properties));
+        int len = strlen(file_path);
+        props->file_path = (char *)malloc((len + 1) * sizeof(char));
+        strncpy(props->file_path, file_path, len);
+        props->file_path[len] = '\0';
+        props->chunk_size = chunk_size_value;
+        add_processor_properties(uuid_str, props);
+    }
+
+    FILE * fp = fopen(props->file_path, "rb");
+
+    if (!fp) {
+        printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
+        return;
+    }
+
+    char * buff = (char *)malloc((props->chunk_size +1 ) * sizeof(char));
+    size_t bytes_read = 0;
+
+    uint64_t curr_offset = get_current_offset(uuid_str);
+    fseek(fp, curr_offset, SEEK_SET);
+    while ((bytes_read = fread(buff, 1, props->chunk_size, fp)) > 0) {
+        if (bytes_read < props->chunk_size) {
+            break;
+        }
+        buff[props->chunk_size] = '\0';
+        flow_file_record * ffr = write_to_flow(buff, strlen(buff), ctx);
+        curr_offset = ftell(fp);
+        add_attributes(ffr, props->file_path, curr_offset);
+        add_to_hash_table(ffr, curr_offset, uuid_str);
+    }
+    free(buff);
+    fclose(fp);
+}
+
+flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx) {
+    flow_file_info ff_info;
+    memset(&ff_info, 0, sizeof(ff_info));
+
+    if (!file_path) {
+        return ff_info;
+    }
+
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    char buff[MAX_BYTES_READ + 1];
+    errno = 0;
+    FILE * fp = fopen(file_path, "rb");
+    if (!fp) {
+        printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
+        return ff_info;
+    }
+
+    uint64_t curr_offset = get_current_offset(uuid_str);
+
+    fseek(fp, curr_offset, SEEK_SET);
+
+    flow_file_list * ffl = NULL;
+    size_t bytes_read = 0;
+    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        struct token_list tokens = tokenize_string_tailfile(buff, delim);
+        if (tokens.total_bytes > 0) {
+            ff_info.total_bytes += tokens.total_bytes;
+            curr_offset += tokens.total_bytes;
+            fseek(fp, curr_offset, SEEK_SET);
+        }
+
+        token_node * head;
+        for (head = tokens.head; head && head->data; head = head->next) {
+            flow_file_record * ffr = write_to_flow(head->data, strlen(head->data), ctx);
+            add_attributes(ffr, file_path, curr_offset);
+            add_flow_file_record(&ffl, ffr);
+        }
+        free_all_tokens(&tokens);
+    }
+    fclose(fp);
+    ff_info.ff_list = ffl;
+    return ff_info;
+}
+
+struct proc_properties * get_properties(const char * uuid, processor_context * ctx) {
+    struct proc_properties * props = get_processor_properties(uuid);
+    if (props) {
+        return props;
+    }
+
+    char file_path[4096];
+    char delimiter[3];
+
+    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+        return props;
+    }
+
+    if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
+        printf("No delimiter found\n");
+        return props;
+    }
+
+    if (strlen(delimiter) == 0) {
+        printf("Delimiter not specified or it is empty\n");
+        return props;
+    }
+
+    props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
+    memset(props, 0, sizeof(struct proc_properties));
+
+    char delim = delimiter[0];
+
+    if (delim == '\\') {
+          if (strlen(delimiter) > 1) {
+            switch (delimiter[1]) {
+              case 'r':
+                delim = '\r';
+                break;
+              case 't':
+                delim = '\t';
+                break;
+              case 'n':
+                delim = '\n';
+                break;
+              case '\\':
+                delim = '\\';
+                break;
+              default:
+                break;
+            }
+        }
+    }
+
+    int len = strlen(file_path);
+    props->file_path = (char *)malloc((len + 1) * sizeof(char));
+    strncpy(props->file_path, file_path, len);
+    props->file_path[len] = '\0';
+    props->delimiter = delim;
+
+    add_processor_properties(uuid, props);
+    return props;
+}
+
+void on_trigger_logaggregator(processor_session * ps, processor_context * ctx) {
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    struct proc_properties * props = get_properties(uuid_str, ctx);
+
+    if (!props || !props->file_path) return;
+
+    char delim = props->delimiter;
+
+    initialize_content_repo(ctx, uuid_str);
+    flow_file_info ff_info = log_aggregate(props->file_path, delim, ctx);
+
+    update_proc_params(uuid_str, ff_info.total_bytes, ff_info.ff_list);
+}
+
+void write_flow_file(flow_file_record * ffr, const char * buff, size_t count) {
+    FILE * ffp = fopen(ffr->contentLocation, "ab");
+    if (!ffp) return;
+    if (fwrite(buff, 1, count, ffp) < count) {
+        fclose(ffp);
+        free_flowfile(ffr);
+        return;
+    }
+    fclose(ffp);
+}
+
+flow_file_list * get_last_flow_file(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (!pp) {
+        return NULL;
+    }
+
+    flow_file_list * ff_list = pp->ff_list;
+    flow_file_list * el = NULL;
+    LL_FOREACH(ff_list, el) {
+        if (el && !el->next) {
+            return el;
+        }
+    }
+    return NULL;
+}
+
+flow_file_list * add_flow_file_to_proc_params(const char * uuid, flow_file_record * ffr) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (!pp) {
+        pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+    }
+    flow_file_list * ffl_node = add_flow_file_record(&pp->ff_list, ffr);
+    ffl_node->complete = 0;
+    return ffl_node;
+}
+
+void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx) {
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    initialize_content_repo(ctx, uuid_str);
+    struct proc_properties * props = get_properties(uuid_str, ctx);
+
+    if (!props || !props->file_path) return;
+
+    char delim = props->delimiter;
+
+    FILE * fp = fopen(props->file_path, "rb");
+
+    if (!fp) {
+        printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
+        return;
+    }
+
+    char buff[MAX_BYTES_READ + 1];
+    size_t bytes_read = 0;
+
+    uint64_t curr_offset = get_current_offset(uuid_str);
+    fseek(fp, curr_offset, SEEK_SET);
+
+    flow_file_list * ffl_node = get_last_flow_file(uuid_str);
+    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        const char * begin = buff;
+        const char * end = NULL;
+
+        while ((end = strchr(begin, delim))) {
+            uint64_t len = end - begin;
+            if (len > 0) {
+                if (!ffl_node || ffl_node->complete) {
+                    ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
+                }
+                write_flow_file(ffl_node->ff_record, begin, len);
+                update_curr_offset(uuid_str, (len + 1));
+            }
+            else {
+                update_curr_offset(uuid_str, 1);
+            }
+            if (ffl_node) {
+                ffl_node->complete = 1;
+                update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
+            }
+            begin = (end + 1);
+        }
+
+        if (!end && *begin != '\0') {
+            if (!ffl_node || ffl_node->complete) {
+                ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
+            }
+            size_t count = strlen(begin);
+            write_flow_file(ffl_node->ff_record, begin, count);
+            update_curr_offset(uuid_str, count);
+            update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
+        }
+    }
+    fclose(fp);
+}
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index d37fc6b..e8ea25a 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -113,7 +113,7 @@
    * This API will gradually move away from C++, hence malloc is used for nifi_instance
    * Since minifi::Instance is currently being used, then we need to use new in that case.
    */
-  instance->instance_ptr = new minifi::Instance(url, port->port_id);
+  instance->instance_ptr = new minifi::Instance(url, port->port_id, "filesystemrepository");
 
   NULL_CHECK(nullptr, instance->instance_ptr);
 
@@ -124,25 +124,53 @@
   return instance;
 }
 
-standalone_processor *create_processor(const char *name) {
+standalone_processor * create_processor(const char *name, nifi_instance * instance) {
   NULL_CHECK(nullptr, name);
   auto ptr = ExecutionPlan::createProcessor(name, name);
   if (!ptr) {
     return nullptr;
   }
-  if (standalone_instance == nullptr) {
+  if (instance == NULL) {
     nifi_port port;
     char portnum[] = "98765";
     port.port_id = portnum;
-    standalone_instance = create_instance("internal_standalone", &port);
+    instance = create_instance("internal_standalone", &port);
   }
-  auto flow = create_new_flow(standalone_instance);
+  auto flow = create_new_flow(instance);
   std::shared_ptr<ExecutionPlan> plan(flow);
   plan->addProcessor(ptr, name);
   ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
   return static_cast<standalone_processor*>(ptr.get());
 }
 
+void initialize_content_repo(processor_context * ctx, const char * uuid) {
+    if (ctx->isInitialized()) {
+        return;
+    }
+    char * cwd = get_current_working_directory();
+    if (cwd) {
+        const char * sep = get_separator(0);
+        const std::string repo_path = std::string(cwd) + sep + "contentrepository" + sep + uuid;
+        ctx->initializeContentRepository(repo_path);
+        free(cwd);
+    }
+}
+
+void clear_content_repo(const nifi_instance * instance) {
+    const auto content_repo = static_cast<minifi::Instance*>(instance->instance_ptr)->getContentRepository();
+    const auto storage_path = content_repo->getStoragePath();
+    remove_directory(storage_path.c_str());
+}
+
+void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target) {
+    strcpy(uuid_target, proc->getUUIDStr().c_str());
+}
+
+void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target) {
+    standalone_processor * proc = static_cast<standalone_processor*>(ctx->getProcessorNode()->getProcessor().get());
+    get_proc_uuid_from_processor(proc, uuid_target);
+}
+
 void free_standalone_processor(standalone_processor* proc) {
   NULL_CHECK(, proc);
   ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
@@ -245,28 +273,58 @@
   return new_ff;
 }
 
-flow_file_record * generate_flow_file(nifi_instance * instance, standalone_processor * proc) {
-    if (!instance || !proc) {
-        return nullptr;
-    }
+flow_file_record * generate_flow(processor_context * ctx) {
     flow_file_record * ffr = create_ff_object_nc();
 
-    auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-    auto content_repo = minifi_instance_ref->getContentRepository();
+    if (ffr->crp) {
+    	delete static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    }
+    ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(ctx->getContentRepository()));
 
-    ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(content_repo));
-    auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+    auto plan = ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUIDStr());
+
     if (!plan) {
         return nullptr;
     }
-    ffr->ffp = static_cast<void*>(new std::shared_ptr<core::FlowFile>(plan->getCurrentFlowFile()));
-    ffr->keepContent = 1;
+    ffr->ffp = NULL;
+    ffr->keepContent = 0;
     auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp));
     auto claim = std::make_shared<minifi::ResourceClaim>(*ff_content_repo_ptr);
-    const char * full_path = claim->getContentFullPath().c_str();
-    int len = strlen(full_path);
-    ffr->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
-    snprintf(ffr->contentLocation, len + 1, "%s", full_path);
+
+    size_t len = strlen(claim->getContentFullPath().c_str());
+    ffr->contentLocation = (char *) malloc((len + 1) * sizeof(char));
+    snprintf(ffr->contentLocation, len+1, "%s", claim->getContentFullPath().c_str());
+    return ffr;
+}
+
+flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx) {
+    if (!ctx) {
+        return NULL;
+    }
+
+    flow_file_record * ffr = generate_flow(ctx);
+
+    if (ffr == NULL) {
+        printf("Could not generate flow file\n");
+        return NULL;
+    }
+
+    FILE * ffp = fopen(ffr->contentLocation, "wb");
+    if (!ffp) {
+        printf("Cannot open flow file at path %s to write content to.\n", ffr->contentLocation);
+        free_flowfile(ffr);
+        return NULL;
+    }
+
+    int ret = fwrite(buff, 1, count, ffp);
+    if (ret < count) {
+        fclose(ffp);
+        free_flowfile(ffr);
+        return NULL;
+    }
+    fseek(ffp, 0, SEEK_END);
+    ffr->size = ftell(ffp);
+    fclose(ffp);
     return ffr;
 }
 
diff --git a/nanofi/src/core/file_utils.c b/nanofi/src/core/file_utils.c
index 3f7b79e..1eeedc6 100644
--- a/nanofi/src/core/file_utils.c
+++ b/nanofi/src/core/file_utils.c
@@ -20,44 +20,122 @@
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <unistd.h>
+#include <limits.h>
 
-#include "api/nanofi.h"
 #include "core/string_utils.h"
 #include "core/file_utils.h"
 
-token_list tail_file(const char * file_path, char delim, int curr_offset) {
-    token_list tkn_list;
-    memset(&tkn_list, 0, sizeof(struct token_list));
+#ifdef _MSC_VER
+#ifndef PATH_MAX
+#define PATH_MAX 260
+#endif
+#endif
 
-    if (!file_path) {
-        return tkn_list;
+int is_directory(const char * path) {
+    struct stat dir_stat;
+    if (stat(path, &dir_stat) < 0) {
+        return 0;
+    }
+    return S_ISDIR(dir_stat.st_mode);
+}
+
+const char * get_separator(int force_posix)
+{
+#ifdef WIN32
+    if (!force_posix) {
+        return "\\";
+    }
+#endif
+    return "/";
+}
+
+char * concat_path(const char * parent, const char * child) {
+    char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * sizeof(char));
+    strcpy(path, parent);
+    const char * sep = get_separator(0);
+    strcat(path, sep);
+    strcat(path, child);
+    return path;
+}
+
+void remove_directory(const char * dir_path) {
+
+    if (!is_directory(dir_path)) {
+        if (unlink(dir_path) == -1) {
+            printf("Could not remove file %s\n", dir_path);
+        }
+        return;
     }
 
-    char buff[MAX_BYTES_READ + 1];
-    memset(buff, 0, MAX_BYTES_READ+1);
+    uint64_t path_len = strlen(dir_path);
+    struct dirent * dir;
+    DIR * d = opendir(dir_path);
+
+    while ((dir = readdir(d)) != NULL) {
+        char * entry_name = dir->d_name;
+        if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) {
+            continue;
+        }
+        char * path = concat_path(dir_path, entry_name);
+        remove_directory(path);
+        free(path);
+    }
+
+    rmdir(dir_path);
+    closedir(d);
+}
+
+int make_dir(const char * path) {
+    if (!path) return -1;
+
     errno = 0;
-    FILE * fp = fopen(file_path, "rb");
-    if (!fp) {
-        printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
-        return tkn_list;
+    int ret = mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
+    if (ret == 0) {
+        return 0;
     }
-    fseek(fp, curr_offset, SEEK_SET);
 
-    int bytes_read = 0;
-    int i = 0;
-    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
-        buff[bytes_read] = '\0';
-        struct token_list tokens = tokenize_string_tailfile(buff, delim);
-        if (tokens.size > 0) {
-            attach_lists(&tkn_list, &tokens);
+    switch (errno) {
+    case ENOENT: {
+        char * found = strrchr(path, '/');
+        if (!found) {
+            return -1;
         }
-        tkn_list.total_bytes += tokens.total_bytes;
-        if (tokens.total_bytes > 0) {
-            curr_offset += tokens.total_bytes;
-            fseek(fp, curr_offset, SEEK_SET);
+        int len = found - path;
+        char * dir = calloc(len + 1, sizeof(char));
+        strncpy(dir, path, len);
+        dir[len] = '\0';
+        int res = make_dir(dir);
+        free(dir);
+        if (res < 0) {
+            return -1;
         }
-        memset(buff, 0, MAX_BYTES_READ);
+        return mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
     }
-    fclose(fp);
-    return tkn_list;
+    case EEXIST: {
+        if (is_directory(path)) {
+            return 0;
+        }
+        return -1;
+    }
+    default:
+        return -1;
+    }
+}
+
+char * get_current_working_directory() {
+    char * cwd = (char *)malloc(PATH_MAX * sizeof(char));
+    memset(cwd, 0, PATH_MAX);
+    #ifdef WIN32
+    if (_getcwd(cwd, PATH_MAX) != NULL)
+        return cwd;
+    #else
+    if (getcwd(cwd, PATH_MAX) != NULL) {
+        return cwd;
+    }
+    #endif
+    free(cwd);
+    return NULL;
 }
diff --git a/nanofi/src/core/flowfiles.c b/nanofi/src/core/flowfiles.c
index edf2c5b..6c86e98 100644
--- a/nanofi/src/core/flowfiles.c
+++ b/nanofi/src/core/flowfiles.c
@@ -16,38 +16,149 @@
  * limitations under the License.
  */
 
+#include "api/nanofi.h"
+#include "api/ecu.h"
 #include "core/flowfiles.h"
+
+#include "utlist.h"
 #include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/stat.h>
 
-void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record) {
-    if (!ff_list || !record) return;
-
-    struct flow_file_list_node * new_node = (struct flow_file_list_node *)malloc(sizeof(struct flow_file_list_node));
-    new_node->ff_record = record;
-    new_node->next = NULL;
-
-    if (!ff_list->head || !ff_list->tail) {
-        ff_list->head = ff_list->tail = new_node;
-        ff_list->len = 1;
-        return;
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record) {
+    if (!record) {
+        return *ff_list;
     }
 
-    ff_list->tail->next = new_node;
-    ff_list->tail = new_node;
-    ff_list->len++;
+    struct flow_file_list * new_node = (struct flow_file_list *)malloc(sizeof(struct flow_file_list));
+    new_node->ff_record = record;
+    LL_APPEND(*ff_list, new_node);
+    return new_node;
 }
 
-void free_flow_file_list(flow_file_list * ff_list) {
-    if (!ff_list || !ff_list->head) {
+void free_flow_file_list(flow_file_list ** ff_list) {
+    if (!*ff_list) {
+        return;
+    }
+    flow_file_list * head = *ff_list;
+    while (head) {
+       flow_file_list * tmp = head;
+       free_flowfile(tmp->ff_record);
+       head = head->next;
+       free(tmp);
+    }
+}
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) {
+    char offset_str[21];
+    snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+    add_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+    char content_location[strlen(ffr->contentLocation) + 1];
+    snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation);
+    add_attribute(ffr, "content location", content_location, strlen(content_location));
+    add_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) {
+    char offset_str[21];
+    snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+    update_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+    char content_location[strlen(ffr->contentLocation) + 1];
+    snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation);
+    update_attribute(ffr, "content location", content_location, strlen(content_location));
+    update_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete) {
+    if (!instance || !ff_list) {
+        return;
+    }
+    flow_file_list * el = NULL;
+    LL_FOREACH(ff_list, el) {
+        if (!complete || el->complete) {
+            transmit_flowfile(el->ff_record, instance);
+        }
+    }
+}
+
+void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client) {
+    if (!ffl || !client) {
         return;
     }
 
-    flow_file_list_node * head = ff_list->head;
-    while (head) {
-        free_flowfile(head->ff_record);
-        flow_file_list_node * tmp = head;
-        head = head->next;
-        free(tmp);
+    char * file = ffl->ff_record->contentLocation;
+    FILE * fp = fopen(file, "rb");
+    if (!fp) {
+        return;
     }
-    memset(ff_list, 0, sizeof(struct flow_file_list));
+
+    struct stat statfs;
+    if (stat(file, &statfs) < 0) {
+        return;
+    }
+    size_t file_size = statfs.st_size;
+
+    attribute attr;
+    attr.key = "current offset";
+    if (get_attribute(ffl->ff_record, &attr) < 0) {
+        printf("Error looking up flow file attribute %s\n", attr.key);
+        return;
+    }
+
+    errno = 0;
+    uint64_t offset = strtoull((const char *)attr.value, NULL, 10);
+    if (errno != 0) {
+        printf("Error converting flow file offset value\n");
+        return;
+    }
+    uint64_t begin_offset =  offset - file_size;
+    char * buff = (char *)malloc(sizeof(char) * 4097);
+    size_t count = 0;
+    while ((count = fread(buff, 1, 4096, fp)) > 0) {
+        buff[count] = '\0';
+        begin_offset += count;
+        char offset_str[21];
+        snprintf(offset_str, sizeof(offset_str), "%llu", begin_offset);
+        update_attribute(ffl->ff_record, "current offset", offset_str, strlen(offset_str));
+
+        attribute_set as;
+        uint64_t num_attrs = get_attribute_quantity(ffl->ff_record);
+        as.size = num_attrs;
+        as.attributes = (attribute *)malloc(num_attrs * sizeof(attribute));
+        get_all_attributes(ffl->ff_record, &as);
+
+        if (transmitPayload(client, buff, &as) == 0) {
+            printf("payload of %zu bytes from %s sent successfully\n", count, ffl->ff_record->contentLocation);
+        }
+        else {
+            printf("Failed to send payload, flow file %s\n", ffl->ff_record->contentLocation);
+        }
+        free(as.attributes);
+    }
+    free(buff);
+    fclose(fp);
+}
+
+void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete) {
+    if (!client || !ff_list) {
+        return;
+    }
+    flow_file_list * el = NULL;
+    LL_FOREACH(ff_list, el) {
+        if (!complete || el->complete) {
+            read_payload_and_transmit(el, client);
+        }
+    }
+}
+
+uint64_t flow_files_size(flow_file_list * ff_list) {
+    if (!ff_list) {
+        return 0;
+    }
+
+    uint64_t counter = 0;
+    flow_file_list * el = NULL;
+    LL_COUNT(ff_list, el, counter);
+    return counter;
 }
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index 9a63c4a..769b1dc 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -318,7 +318,7 @@
 
   create_testfile_for_getfile(sourcedir.c_str());
 
-  standalone_processor* getfile_proc = create_processor("GetFile");
+  standalone_processor* getfile_proc = create_processor("GetFile", NULL);
   REQUIRE(set_standalone_property(getfile_proc, "Input Directory", sourcedir.c_str()) == 0);
 
   flow_file_record* ffr = invoke(getfile_proc);
@@ -326,7 +326,7 @@
   REQUIRE(ffr != nullptr);
   REQUIRE(get_attribute_quantity(ffr) > 0);
 
-  standalone_processor* extract_test = create_processor("ExtractText");
+  standalone_processor* extract_test = create_processor("ExtractText", NULL);
   REQUIRE(extract_test != nullptr);
   REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
 
@@ -379,7 +379,7 @@
   flow_file_record *record = get_next_flow_file(instance, test_flow);
   REQUIRE(record != nullptr);
 
-  standalone_processor* putfile_proc = create_processor("PutFile");
+  standalone_processor* putfile_proc = create_processor("PutFile", NULL);
   REQUIRE(set_standalone_property(putfile_proc, "Directory", putfiledir.c_str()) == 0);
 
   flow_file_record* put_record = invoke_ff(putfile_proc, record);
@@ -409,7 +409,7 @@
   auto sourcedir = testController.createTempDirectory(src_format);
   std::string path = create_testfile_for_getfile(sourcedir.c_str());
 
-  standalone_processor* extract_test = create_processor("ExtractText");
+  standalone_processor* extract_test = create_processor("ExtractText", NULL);
   REQUIRE(extract_test != nullptr);
   REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
 
@@ -465,9 +465,9 @@
   free_standalone_processor(nullptr);
   free_instance(nullptr);
 
-  REQUIRE(create_processor(nullptr) == nullptr);
+  REQUIRE(create_processor(nullptr, nullptr) == nullptr);
 
-  standalone_processor *standalone_proc = create_processor("GetFile");
+  standalone_processor *standalone_proc = create_processor("GetFile", NULL);
   REQUIRE(standalone_proc != nullptr);
 
   REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1);
diff --git a/nanofi/tests/CTailFileTests.cpp b/nanofi/tests/CLogAggregatorTests.cpp
similarity index 69%
rename from nanofi/tests/CTailFileTests.cpp
rename to nanofi/tests/CLogAggregatorTests.cpp
index 491e8c0..edf62c9 100644
--- a/nanofi/tests/CTailFileTests.cpp
+++ b/nanofi/tests/CLogAggregatorTests.cpp
@@ -16,20 +16,20 @@
  * limitations under the License.
  */
 
+#ifndef _WIN32
 #include "catch.hpp"
 
 #include <vector>
 #include <string>
-#include <fstream>
 #include <numeric>
 #include <algorithm>
-#include <assert.h>
 #include <unistd.h>
 #include <string.h>
 #include <sys/stat.h>
 #include "core/string_utils.h"
 #include "core/file_utils.h"
 
+#include "CTestsBase.h"
 
 void test_lists_equal(token_list * tknlist, const std::vector<std::string>& sv) {
     REQUIRE(tknlist != NULL);
@@ -174,131 +174,101 @@
  * ##################################################################
  */
 
-class FileManager {
-public:
-    FileManager(const std::string& filePath) {
-        assert(!filePath.empty() && "filePath provided cannot be empty!");
-        filePath_ = filePath;
-        outputStream_.open(filePath_, std::ios::binary);
-    }
+TEST_CASE("Simple log aggregator test", "[testLogAggregator]") {
 
-    ~FileManager() {
-        std::ifstream ifs(filePath_);
-        if (ifs.good()) {
-            remove(filePath_.c_str());
-        }
-    }
-
-    void Write(const std::string& str) {
-        outputStream_ << str;
-    }
-
-    std::string WriteNChars(uint64_t n, char c) {
-        std::string s(n, c);
-        outputStream_ << s;
-        return s;
-    }
-
-    std::string getFilePath() const {
-        return filePath_;
-    }
-
-    void CloseStream() {
-        outputStream_.flush();
-        outputStream_.close();
-    }
-
-    uint64_t GetFileSize() {
-        CloseStream();
-        struct stat buff;
-        if (stat(filePath_.c_str(), &buff) == 0) {
-            return buff.st_size;
-        }
-        return 0;
-    }
-
-private:
-    std::string filePath_;
-    std::ofstream outputStream_;
-};
-
-TEST_CASE("Simple tail file test", "[testTailFile]") {
-
+    const char * content = "hello world";
     FileManager fm("test.txt");
-    fm.Write("hello world");
+    fm.Write(content);
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, ';', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 0);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->curr_offset == 0);
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
 }
 
-TEST_CASE("Empty file tail test", "[testEmptyFileTail]") {
+TEST_CASE("Empty file log aggregator test", "[testEmptyFileLogAggregator]") {
     FileManager fm("test.txt");
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, ';', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 0);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
+    REQUIRE(pp->ff_list == NULL);
+    REQUIRE(pp->curr_offset == 0);
 }
 
-TEST_CASE("File containing only delimiters tail test", "[testDelimiterOnlyFileTail]") {
+TEST_CASE("File containing only delimiters test", "[testDelimiterOnlyLogAggregator]") {
     FileManager fm("test.txt");
-    fm.Write("----");
+    fm.Write(";;;;");
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, '-', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 4);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
+    REQUIRE(pp->ff_list == NULL);
+    REQUIRE(pp->curr_offset == 4);
 }
 
-TEST_CASE("File tail test string starting with delimiter", "[testDelimiterOnlyFileTail]") {
+TEST_CASE("File containing string starting with delimiter", "[testDelimiterStartingStrings]") {
     FileManager fm("test.txt");
-    fm.Write("----hello");
+    fm.Write(";;;;hello");
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, '-', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 4);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    auto pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
+    REQUIRE(pp->ff_list == NULL);
+    REQUIRE(pp->curr_offset == 4);
 }
 
-TEST_CASE("Test tail file with less than 4096 delimited chars", "[testTailFileDelimitedString]") {
+TEST_CASE("Test tail file with less than 4096 delimited chars", "[testLogAggregateFileLessThan4KB]") {
+    const std::string token1("token1");
+    const std::string token2("token2");
+    const std::string token3("token3");
+    std::vector<std::string> tokens = {token1, token2, token3};
 
-    const std::string delimitedString = "token1--token2--token3";
+    const std::string delimitedString = join_strings(tokens, ";;");
     FileManager fm("test.txt");
     fm.Write(delimitedString);
     const std::string filePath = fm.getFilePath();
     fm.CloseStream();
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"});
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    auto pp = invoke_processor(mgr, filePath.c_str());
+
+    REQUIRE(pp->curr_offset == (token1.size() + token2.size() + (2 * std::string("--").size())));
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
 }
 
 // Although there is no delimiter within the string that is at least 4096 bytes long,
 // tail_file still creates a flow file for the first 4096 bytes.
-TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testTailFile4096Chars]") {
+TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testLogAggregateFile4096Chars]") {
 
     FileManager fm("test.txt");
     const std::string s = std::move(fm.WriteNChars(4096, 'a'));
     const std::string filePath = fm.getFilePath();
     fm.CloseStream();
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s)});
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(pp->curr_offset == 4096);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
 }
 
 // Although there is no delimiter within the string that is equal to 4096 bytes or longer
 // tail_file creates a flow file for each subsequent 4096 byte chunk. It leaves the last chunk
 // if it is smaller than 4096 bytes and not delimited
-TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testTailFileMoreThan4096Chars]") {
+TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testLogAggregarteFileMoreThan4096Chars]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
@@ -307,63 +277,89 @@
     fm.Write(s3);
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + s3.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
+    REQUIRE(pp->curr_offset == (s1.size() + s2.size()));
 }
 
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testLogAggregateWithDelimitedString]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
-    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string d1 = std::move(fm.WriteNChars(2, ';'));
     const std::string s2 = std::move(fm.WriteNChars(4096, 'b'));
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+    REQUIRE(pp->curr_offset == totalStringsSize);
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
 }
 
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testLogAggregateDelimited]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
-    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string d1 = std::move(fm.WriteNChars(2, ';'));
     const std::string s2 = std::move(fm.WriteNChars(4000, 'b'));
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(pp->curr_offset == (s1.size() + d1.size()));
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
 }
 
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testLogAggregateDelimited]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
-    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string d1 = std::move(fm.WriteNChars(2, ';'));
     const std::string s2 = std::move(fm.WriteNChars(4098, 'b'));
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    const std::string s3 = std::string(s2.data(), s2.data()+4096);
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s3)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(pp->curr_offset == 8194);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
 }
+#endif
diff --git a/nanofi/tests/CTailFileChunkTests.cpp b/nanofi/tests/CTailFileChunkTests.cpp
new file mode 100644
index 0000000..ff933c8
--- /dev/null
+++ b/nanofi/tests/CTailFileChunkTests.cpp
@@ -0,0 +1,135 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _WIN32
+#include "catch.hpp"
+
+#include <dirent.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+#include "CTestsBase.h"
+
+/****
+ * ##################################################################
+ *  CTAILFILE CHUNK TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test tailfile chunk size 4096, file size 8KB", "[tailfileChunk8KBFileSize]") {
+
+    TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+    const char * file = "./e.txt";
+    const char * chunksize = "4096";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4096, 'a');
+    fm.WriteNChars(4096, 'b');
+    fm.CloseStream();
+
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "chunk_size", chunksize);
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 8192 bytes
+    REQUIRE(pp->curr_offset == 8192);
+}
+
+TEST_CASE("Test tailfile chunk size 4096, file size less than 8KB", "[tailfileChunkFileSizeLessThan8KB]") {
+
+    TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+    const char * file = "./e.txt";
+    const char * chunksize = "4096";
+
+    //Write 4505 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4096, 'a');
+    fm.WriteNChars(409, 'b');
+    fm.CloseStream();
+
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "chunk_size", chunksize);
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+    //Test that one flow file record was created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+    //Test that the current offset in the file is 4096 bytes
+    REQUIRE(pp->curr_offset == 4096);
+    REQUIRE(pp->ff_list->ff_record->size == 4096);
+
+    struct stat fstat;
+    REQUIRE(stat(pp->ff_list->ff_record->contentLocation, &fstat) == 0);
+    REQUIRE(fstat.st_size == 4096);
+}
+
+TEST_CASE("Test tailfile chunk size 512, file size equal to 4608B", "[tailfileChunkFileSize8KB]") {
+
+    TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+    const char * file = "./e.txt";
+    const char * chunksize = "512";
+
+    //Write 4608 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4608, 'a');
+    fm.CloseStream();
+
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "chunk_size", chunksize);
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+
+    //Test that one flow file record was created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 9);
+
+    //Test that the current offset in the file is 4608 bytes
+    REQUIRE(pp->curr_offset == 4608);
+}
+#endif
diff --git a/nanofi/tests/CTailFileDelimitedTests.cpp b/nanofi/tests/CTailFileDelimitedTests.cpp
new file mode 100644
index 0000000..ed079d1
--- /dev/null
+++ b/nanofi/tests/CTailFileDelimitedTests.cpp
@@ -0,0 +1,256 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "catch.hpp"
+
+#include "CTestsBase.h"
+
+/****
+ * ##################################################################
+ *  CTAILFILE DELIMITED TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test tailfile delimited. Empty file", "[tailfileDelimitedEmptyFileTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Create empty file
+    FileManager fm(file);
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that no flowfiles were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list == NULL);
+}
+
+TEST_CASE("Test tailfile delimited. File has less than 4096 chars", "[tailfileDelimitedLessThan4096Chars]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    FileManager fm(file);
+    fm.WriteNChars(34, 'a');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //No flow files will be created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 0);
+
+    //Test that the current offset in the file is 34
+    REQUIRE(pp->curr_offset == 34);
+}
+
+TEST_CASE("Test tailfile delimited. Simple test", "[tailfileDelimitedSimpleTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(34, 'a');
+    fm.WriteNChars(1, ';');
+    fm.WriteNChars(6, 'b');
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 42 bytes
+    REQUIRE(pp->curr_offset == 42);
+
+    //Test the flow file sizes
+    const char * flowfile1_path = pp->ff_list->ff_record->contentLocation;
+    const char * flowfile2_path = pp->ff_list->next->ff_record->contentLocation;
+
+    struct stat fstat;
+    stat(flowfile1_path, &fstat);
+    REQUIRE(fstat.st_size == 34);
+
+    stat(flowfile2_path, &fstat);
+    REQUIRE(fstat.st_size == 6);
+
+    REQUIRE(pp->ff_list->complete == 1);
+    REQUIRE(pp->ff_list->next->complete == 1);
+}
+
+TEST_CASE("Test tailfile delimited. trailing non delimited string", "[tailfileNonDelimitedTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(34, 'a');
+    fm.WriteNChars(1, ';');
+    fm.WriteNChars(32, 'b');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 35 bytes
+    REQUIRE(pp->curr_offset == 67);
+    REQUIRE(pp->ff_list->complete == 1);
+    REQUIRE(pp->ff_list->next->complete == 0);
+    struct stat fstat;
+    stat(pp->ff_list->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 34);
+
+    //Append a delimiter at the end of the file
+    fm.OpenStream();
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    stat(pp->ff_list->next->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 32);
+    REQUIRE(pp->ff_list->next->complete == 1);
+}
+
+TEST_CASE("Test tailfile delimited 4096 chars non delimited", "[tailfileDelimitedSimpleTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 4096 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4096, 'a');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp !=  NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 0);
+    //Test that the current offset in the file is 4096 bytes
+    REQUIRE(pp->curr_offset == 4096);
+
+    //Write another 2048 characters
+    fm.OpenStream();
+    fm.WriteNChars(2048, 'b');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 0);
+
+    //Test that the current offset in the file is (4096 + 2048)
+    REQUIRE(pp->curr_offset == 6144);
+
+    //Write another 2048 characters
+    fm.OpenStream();
+    fm.WriteNChars(2048, 'c');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+    //Test that the current offset in the file is 8192 bytes only
+    REQUIRE(pp->curr_offset == 8192);
+
+    //Write a delimiter at the end and expect a flow file size of 8192 bytes
+    fm.OpenStream();
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 1);
+    const char * flowfile_path = pp->ff_list->ff_record->contentLocation;
+    struct stat fstat;
+    stat(flowfile_path, &fstat);
+    REQUIRE(fstat.st_size == 8192);
+}
+
+TEST_CASE("Test tailfile delimited. string starting with delimiter", "[tailfileDelimiterStartStringTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(5, ';');
+    fm.WriteNChars(34, 'a');
+    fm.WriteNChars(4, ';');
+    fm.WriteNChars(32, 'b');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 35 bytes
+    REQUIRE(pp->curr_offset == 75);
+    REQUIRE(pp->ff_list->complete == 1);
+    REQUIRE(pp->ff_list->next->complete == 0);
+    struct stat fstat;
+    stat(pp->ff_list->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 34);
+
+    //Append a delimiter at the end of the file
+    fm.OpenStream();
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    stat(pp->ff_list->next->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 32);
+    REQUIRE(pp->ff_list->next->complete == 1);
+}
diff --git a/nanofi/tests/CTestsBase.h b/nanofi/tests/CTestsBase.h
new file mode 100644
index 0000000..bcde416
--- /dev/null
+++ b/nanofi/tests/CTestsBase.h
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _WIN32
+#ifndef NANOFI_TESTS_CTESTSBASE_H_
+#define NANOFI_TESTS_CTESTSBASE_H_
+
+#include <vector>
+#include <string>
+#include <fstream>
+#include <assert.h>
+#include <sys/stat.h>
+
+#include "core/file_utils.h"
+#include "api/ecu.h"
+#include "api/nanofi.h"
+
+class FileManager {
+public:
+    FileManager(const std::string& filePath) {
+        assert(!filePath.empty() && "filePath provided cannot be empty!");
+        struct stat statbuff;
+        assert(!is_directory(filePath.c_str()) && "Provided file is not a filepath");
+        filePath_ = filePath;
+        remove(filePath_.c_str());
+        outputStream_.open(filePath_, std::ios::binary);
+    }
+
+    ~FileManager() {
+        std::ifstream ifs(filePath_);
+        if (ifs.good()) {
+            remove(filePath_.c_str());
+        }
+    }
+
+    void Write(const std::string& str) {
+        outputStream_ << str;
+    }
+
+    std::string WriteNChars(uint64_t n, char c) {
+        std::string s(n, c);
+        outputStream_ << s;
+        return s;
+    }
+
+    std::string getFilePath() const {
+        return filePath_;
+    }
+
+    void OpenStream() {
+        outputStream_.open(filePath_, std::ios::binary|std::ios::app);
+    }
+
+    void CloseStream() {
+        outputStream_.flush();
+        outputStream_.close();
+    }
+
+    uint64_t GetFileSize() {
+        CloseStream();
+        struct stat buff;
+        if (stat(filePath_.c_str(), &buff) == 0) {
+            return buff.st_size;
+        }
+        return 0;
+    }
+
+private:
+    std::string filePath_;
+    std::ofstream outputStream_;
+};
+
+class TailFileTestResourceManager {
+public:
+    TailFileTestResourceManager(const std::string& processor_name, void(*callback)(processor_session * ps, processor_context * ctx)) {
+        const char * port_str = "uuid";
+        nifi_port port;
+        port.port_id = (char *)port_str;
+        const char * instance_str = "nifi";
+        instance_ = create_instance(instance_str, &port);
+        add_custom_processor(processor_name.c_str(), callback);
+        processor_ = create_processor(processor_name.c_str(), instance_);
+    }
+
+    ~TailFileTestResourceManager() {
+        remove_directory("./contentrepository");
+        char uuid_str[37];
+        get_proc_uuid_from_processor(processor_, uuid_str);
+        delete_all_flow_files_from_proc(uuid_str);
+        struct processor_params * tmp, * pp = NULL;
+        HASH_ITER(hh, procparams, pp, tmp) {
+            HASH_DEL(procparams, pp);
+            free(pp);
+        }
+        free_standalone_processor(processor_);
+        free_instance(instance_);
+    }
+
+    standalone_processor * getProcessor() const {
+        return processor_;
+    }
+
+    nifi_instance * getInstance() const {
+        return instance_;
+    }
+
+private:
+    nifi_instance * instance_;
+    standalone_processor * processor_;
+};
+
+struct processor_params * invoke_processor(TailFileTestResourceManager& mgr, const char * filePath) {
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", filePath);
+    set_standalone_property(proc, "delimiter", ";");
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+    return pp;
+}
+
+#endif /* NANOFI_TESTS_CTESTSBASE_H_ */
+#endif