Merge remote-tracking branch 'apache/master'
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0906199..9f946f2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -301,6 +301,7 @@
     SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/curl-src"
     CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
                "-DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR}/thirdparty/curl-install"
+                -DCMAKE_POSITION_INDEPENDENT_CODE=ON
                 -DBUILD_CURL_EXE=OFF
                 -DBUILD_TESTING=OFF
                 -DCMAKE_USE_OPENSSL=ON
@@ -425,6 +426,7 @@
 option(DISABLE_CIVET "Disables CivetWeb components." OFF)
 if (NOT DISABLE_CIVET)
 	createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP" "extensions/civetweb" "extensions/civetweb/tests")
+	add_dependencies(minifi-civet-extensions libressl-portable)
 endif()
 
 ## Add the rocks DB extension
diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties
index dc25406..55d07c3 100644
--- a/conf/minifi-log.properties
+++ b/conf/minifi-log.properties
@@ -16,6 +16,8 @@
 #More verbose pattern by default
 #Format details at https://github.com/gabime/spdlog/wiki/3.-Custom-formatting
 spdlog.pattern=[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v
+# uncomment to prune package names
+#spdlog.shorten_names=true
 
 #Old format
 #spdlog.pattern=[%Y-%m-%d %H:%M:%S.%e] [minifi log] [%l] %v
diff --git a/docker/ContainerBuild.sh b/docker/ContainerBuild.sh
index 9164ba5..36f9096 100755
--- a/docker/ContainerBuild.sh
+++ b/docker/ContainerBuild.sh
@@ -18,6 +18,9 @@
 
 #!/bin/bash
 
+# Fail on errors
+set -e
+
 # Set env vars.
 UID_ARG=$1
 GID_ARG=$2
@@ -34,6 +37,7 @@
 echo "MiNiFi Package: $MINIFI_SOURCE_CODE"
 
 # Copy the MiNiFi source tree to the Docker working directory before building
+rm -rf $CMAKE_SOURCE_DIR/docker/${DISTRO_NAME}/minificppsource/
 mkdir -p $CMAKE_SOURCE_DIR/docker/${DISTRO_NAME}/minificppsource
 rsync -avr \
       --exclude '/*build*' \
diff --git a/docker/centos/Dockerfile b/docker/centos/Dockerfile
index e54631a..dca6677 100644
--- a/docker/centos/Dockerfile
+++ b/docker/centos/Dockerfile
@@ -35,7 +35,7 @@
 RUN mkdir -p $MINIFI_BASE_DIR 
 USER $USER
 
-RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel gcc g++ sudo git which maven
+RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel gcc g++ sudo git which maven make
 
 
 ADD $MINIFI_SOURCE_CODE $MINIFI_BASE_DIR
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index d8b280b..48f25b3 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -211,7 +211,7 @@
 
             with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context:
                 dockerfile_info = tarfile.TarInfo('Dockerfile')
-                dockerfile_info.size = len(conf_dockerfile_buffer.getvalue())
+                dockerfile_info.size = conf_dockerfile_buffer.getbuffer().nbytes
                 docker_context.addfile(dockerfile_info,
                                        fileobj=conf_dockerfile_buffer)
 
@@ -286,6 +286,8 @@
         self.connections = {}
         self.out_proc = self
 
+        self.drop_empty_flowfiles = False
+
     def connect(self, connections):
         for rel in connections:
 
@@ -330,6 +332,15 @@
 
         return connected
 
+    def __invert__(self):
+        """
+        Invert operation to set empty file filtering on incoming connections
+        GetFile('/input') >> ~LogAttribute()
+        """
+        self.drop_empty_flowfiles = True
+
+        return self
+
 
 class Processor(Connectable):
     def __init__(self,
@@ -549,7 +560,7 @@
                 'id': str(group.uuid),
                 'url': group.url,
                 'timeout': '30 sec',
-                'yield period': '10 sec',
+                'yield period': '3 sec',
                 'Input Ports': []
             }
 
@@ -597,7 +608,8 @@
                     'name': str(uuid.uuid4()),
                     'source id': str(connectable.uuid),
                     'source relationship name': conn_name,
-                    'destination id': str(proc.uuid)
+                    'destination id': str(proc.uuid),
+                    'drop empty': ("true" if proc.drop_empty_flowfiles else "false")
                 })
                 if proc not in visited:
                     minifi_flow_yaml(proc, res, visited)
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index c93f168..67affa9 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -55,7 +55,7 @@
 
         # Point output validator to ephemeral output dir
         self.output_validator = output_validator
-        if isinstance(output_validator, SingleFileOutputValidator):
+        if isinstance(output_validator, FileOutputValidator):
             output_validator.set_output_dir(self.tmp_test_output_dir)
 
         # Start observing output dir
@@ -196,9 +196,15 @@
         """
         Return True if output is valid; False otherwise.
         """
+        raise NotImplementedError("validate function needs to be implemented for validators")
 
 
-class SingleFileOutputValidator(OutputValidator):
+
+class FileOutputValidator(OutputValidator):
+    def set_output_dir(self, output_dir):
+        self.output_dir = output_dir
+
+class SingleFileOutputValidator(FileOutputValidator):
     """
     Validates the content of a single file in the given directory.
     """
@@ -207,9 +213,6 @@
         self.valid = False
         self.expected_content = expected_content
 
-    def set_output_dir(self, output_dir):
-        self.output_dir = output_dir
-
     def validate(self):
 
         if self.valid:
@@ -217,7 +220,7 @@
 
         listing = listdir(self.output_dir)
 
-        if len(listing) > 0:
+        if listing:
             out_file_name = listing[0]
 
             with open(join(self.output_dir, out_file_name), 'r') as out_file:
@@ -225,9 +228,43 @@
 
                 if contents == self.expected_content:
                     self.valid = True
-                    return True
 
-        return False
+        return self.valid
+
+class EmptyFilesOutPutValidator(FileOutputValidator):
+    """
+    Validates if all the files in the target directory are empty and at least one exists
+    """
+    def __init__(self):
+        self.valid = False
+
+    def validate(self):
+
+        if self.valid:
+            return True
+
+        listing = listdir(self.output_dir)
+        if listing:
+            self.valid = all(os.path.getsize(os.path.join(self.output_dir,x)) == 0 for x in listing)
+
+        return self.valid
+
+class NoFileOutPutValidator(FileOutputValidator):
+    """
+    Validates if no flowfiles were transferred
+    """
+    def __init__(self):
+        self.valid = False
+
+    def validate(self):
+
+        if self.valid:
+            return True
+
+        self.valid = not bool(listdir(self.output_dir))
+
+        return self.valid
+
 
 class SegfaultValidator(OutputValidator):
     """
diff --git a/docker/test/integration/test_filter_zero_file.py b/docker/test/integration/test_filter_zero_file.py
new file mode 100644
index 0000000..3080f8e
--- /dev/null
+++ b/docker/test/integration/test_filter_zero_file.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+def test_filter_zero_file():
+    """
+    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
+    """
+
+    port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
+
+    recv_flow = (port
+                 >> LogAttribute()
+                 >> PutFile('/tmp/output'))
+
+    send_flow = (GenerateFlowFile('0B')
+                 >> LogAttribute()
+                 >> ~port)
+
+    with DockerTestCluster(NoFileOutPutValidator()) as cluster:
+        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
+        cluster.deploy_flow(send_flow)
+        assert cluster.check_output(60)
diff --git a/docker/test/integration/test_zero_file.py b/docker/test/integration/test_zero_file.py
index 23a0e99..f13c18e 100644
--- a/docker/test/integration/test_zero_file.py
+++ b/docker/test/integration/test_zero_file.py
@@ -16,8 +16,7 @@
 from minifi import *
 from minifi.test import *
 
-
-def test_minifi_to_nifi():
+def test_zero_file():
     """
     Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
     """
@@ -32,7 +31,7 @@
                  >> LogAttribute()
                  >> port)
 
-    with DockerTestCluster(SegfaultValidator()) as cluster:
+    with DockerTestCluster(EmptyFilesOutPutValidator()) as cluster:
         cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
         cluster.deploy_flow(send_flow)
         assert cluster.check_output(60)
diff --git a/extensions/civetweb/CMakeLists.txt b/extensions/civetweb/CMakeLists.txt
index 755cbf8..52f0f49 100644
--- a/extensions/civetweb/CMakeLists.txt
+++ b/extensions/civetweb/CMakeLists.txt
@@ -33,6 +33,7 @@
 add_subdirectory(${CIVET_THIRDPARTY_ROOT}
                  ${CIVET_BINARY_ROOT}
                  EXCLUDE_FROM_ALL)
+add_dependencies(c-library libressl-portable)
 
 file(GLOB SOURCES  "processors/*.cpp")
 
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index b065230..e9ea1ed 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -53,10 +53,10 @@
 
 core::Property ListenHTTP::SSLMinimumVersion(
     core::PropertyBuilder::createProperty("SSL Minimum Version")
-        -> withDescription("Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)")
+        -> withDescription("Minimum TLS/SSL version allowed (TLS1.2)")
         ->isRequired(false)
-        ->withAllowableValues<std::string>({"SSL2", "SSL3", "TLS1.0", "TLS1.1", "TLS1.2"})
-        ->withDefaultValue("SSL2")->build());
+        ->withAllowableValues<std::string>({"TLS1.2"})
+        ->withDefaultValue("TLS1.2")->build());
 
 core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that"
                                                     " should be passed along as FlowFile attributes",
@@ -182,21 +182,11 @@
       options.emplace_back("yes");
     }
 
-    if (sslMinVer == "SSL2") {
-      options.emplace_back("ssl_protocol_version");
-      options.emplace_back(std::to_string(0));
-    } else if (sslMinVer == "SSL3") {
-      options.emplace_back("ssl_protocol_version");
-      options.emplace_back(std::to_string(1));
-    } else if (sslMinVer == "TLS1.0") {
-      options.emplace_back("ssl_protocol_version");
-      options.emplace_back(std::to_string(2));
-    } else if (sslMinVer == "TLS1.1") {
-      options.emplace_back("ssl_protocol_version");
-      options.emplace_back(std::to_string(3));
-    } else {
+    if (sslMinVer == "TLS1.2") {
       options.emplace_back("ssl_protocol_version");
       options.emplace_back(std::to_string(4));
+    } else {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid SSL Minimum Version specified!");
     }
   }
 
diff --git a/extensions/civetweb/tests/ListenHTTPTests.cpp b/extensions/civetweb/tests/ListenHTTPTests.cpp
index 1db2fea..76e9342 100644
--- a/extensions/civetweb/tests/ListenHTTPTests.cpp
+++ b/extensions/civetweb/tests/ListenHTTPTests.cpp
@@ -507,7 +507,6 @@
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS minimum SSL version", "[https]") {
   plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem"));
   plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt"));
-  plan->setProperty(listen_http, "SSL Minimum Version", "TLS1.1");
 
   SECTION("GET") {
     method = "GET";
@@ -530,7 +529,7 @@
   if (method == "POST") {
     client->setPostFields(payload);
   }
-  REQUIRE(client->setSpecificSSLVersion(utils::SSLVersion::TLSv1_0));
+  REQUIRE(client->setSpecificSSLVersion(utils::SSLVersion::TLSv1_1));
 
   test_connect(false /*should_succeed*/);
 }
diff --git a/extensions/http-curl/HTTPCurlLoader.h b/extensions/http-curl/HTTPCurlLoader.h
index 3d24cd4..9fab59d 100644
--- a/extensions/http-curl/HTTPCurlLoader.h
+++ b/extensions/http-curl/HTTPCurlLoader.h
@@ -91,7 +91,7 @@
     }
   }
 
-  virtual std::unique_ptr<core::ObjectFactoryInitializer> getInitializer() {
+  virtual std::unique_ptr<core::ObjectFactoryInitializer> getInitializer() override{
     return std::unique_ptr<core::ObjectFactoryInitializer>(new HttpCurlObjectFactoryInitializer());
   }
 
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index edfc59e..562376d 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -184,6 +184,7 @@
 #endif
 }
 
+/* If not set, the default will be TLS 1.0, see https://curl.haxx.se/libcurl/c/CURLOPT_SSLVERSION.html */
 bool HTTPClient::setMinimumSSLVersion(SSLVersion minimum_version) {
   CURLcode ret = CURLE_UNKNOWN_OPTION;
   switch (minimum_version) {
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
index babc983..ae1ebf6 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -80,7 +80,7 @@
   std::string my_port = port;
   my_port += "s";
   callback.log_message = log_message;
-  const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL",
+  const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "4", "ssl_cipher_list", "ALL",
       "ssl_verify_peer", "no", "num_threads", "1", 0 };
 
   std::vector<std::string> cpp_options;
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
index 2641e17..abf8bdd 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.cpp
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
@@ -132,10 +132,10 @@
           logger_->log_debug("confirm read for %s, but not finished ", transaction->getUUIDStr());
           if (stream->waitForDataAvailable()) {
             code = CONTINUE_TRANSACTION;
+            return 1;
           }
         }
 
-        closeTransaction(transaction->getUUIDStr());
         code = CONFIRM_TRANSACTION;
       } else {
         auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
@@ -157,9 +157,8 @@
     } else if (transaction->getState() == TRANSACTION_CONFIRMED) {
       closeTransaction(transaction->getUUIDStr());
       code = CONFIRM_TRANSACTION;
-    } else {
-
     }
+
     return 1;
   } else if (transaction->getState() == TRANSACTION_CONFIRMED) {
     closeTransaction(transaction->getUUIDStr());
@@ -242,27 +241,32 @@
 
   if (it == known_transactions_.end()) {
     return;
-  } else {
-    transaction = it->second;
-    if (transaction->closed_) {
-      return;
-    }
+  }
+
+  transaction = it->second;
+  if (transaction->closed_) {
+    return;
   }
 
   std::string append_str;
-  logger_->log_info("Site to Site closed transaction %s", transaction->getUUIDStr());
+  logger_->log_trace("Site to Site closing transaction %s", transaction->getUUIDStr());
+
+
+  bool data_received = transaction->getDirection() == RECEIVE && (current_code == CONFIRM_TRANSACTION || current_code == TRANSACTION_FINISHED);
 
   int code = UNRECOGNIZED_RESPONSE_CODE;
-  if (transaction->getState() == TRANSACTION_CONFIRMED) {
+  // In case transaction was used to actually transmit data (conditions are a bit different for send and receive to detect this),
+  // it has to be confirmed before closing.
+  // In case no data was transmitted, there is nothing to confirm, so the transaction can be cancelled without confirming it.
+  // Confirm means matching CRC checksum of data at both sides.
+  if (transaction->getState() == TRANSACTION_CONFIRMED || data_received) {
     code = CONFIRM_TRANSACTION;
-  } else if (transaction->getDirection() == RECEIVE && current_code == CONFIRM_TRANSACTION) {
-    if (transaction->_bytes > 0)
-      code = CONFIRM_TRANSACTION;
-    else
-      code = CANCEL_TRANSACTION;
-
   } else if (transaction->current_transfers_ == 0 && !transaction->isDataAvailable()) {
     code = CANCEL_TRANSACTION;
+  } else {
+    std::string directon = transaction->getDirection() == RECEIVE ? "Receive" : "Send";
+    logger_->log_error("Transaction %s to be closed is in unexpected state. Direction: %s, tranfers: %d, bytes: %llu, state: %d",
+        transactionID, directon, transaction->total_transfers_, transaction->_bytes, transaction->getState());
   }
 
   std::stringstream uri;
@@ -270,9 +274,10 @@
 
   uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions/" << transactionID << "?responseCode=" << code;
 
-  if (transaction->getDirection() == RECEIVE && current_code == CONFIRM_TRANSACTION && transaction->_bytes > 0) {
+  if (code == CONFIRM_TRANSACTION && data_received) {
     uri << "&checksum=" << transaction->getCRC();
   }
+
   auto client = create_http_client(uri.str(), "DELETE");
 
   client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
@@ -286,6 +291,9 @@
   logger_->log_debug("Received %d response code from delete", client->getResponseCode());
 
   if (client->getResponseCode() == 400) {
+    std::string error(client->getResponseBody().data(), client->getResponseBody().size());
+
+    logging::LOG_WARN(logger_) << "400 received: " << error;
     std::stringstream message;
     message << "Received " << client->getResponseCode() << " from " << uri.str();
     throw Exception(SITE2SITE_EXCEPTION, message.str());
@@ -297,23 +305,9 @@
 }
 
 void HttpSiteToSiteClient::deleteTransaction(std::string transactionID) {
-  std::shared_ptr<Transaction> transaction = NULL;
-
-  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
-
-  if (it == known_transactions_.end()) {
-    return;
-  } else {
-    transaction = it->second;
-  }
-
-  std::string append_str;
-  logger_->log_debug("Site2Site delete transaction %s", transaction->getUUIDStr());
-
   closeTransaction(transactionID);
 
-  known_transactions_.erase(transactionID);
-
+  SiteToSiteClient::deleteTransaction(transactionID);
 }
 
 } /* namespace sitetosite */
diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h
index 0e9b260..2b667fc 100644
--- a/extensions/http-curl/tests/TestServer.h
+++ b/extensions/http-curl/tests/TestServer.h
@@ -39,7 +39,7 @@
 
 CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) {
   const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "error_log_file",
-      "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list",
+      "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "4", "ssl_cipher_list",
       "ALL", "request_timeout_ms", "10000", "enable_auth_domain_check", "no", "ssl_verify_peer", "no", 0 };
 // ECDH+AESGCM+AES256:!aNULL:!MD5:!DSS
   std::vector<std::string> cpp_options;
diff --git a/extensions/librdkafka/CMakeLists.txt b/extensions/librdkafka/CMakeLists.txt
index 4fc2eff..6b8fbb2 100644
--- a/extensions/librdkafka/CMakeLists.txt
+++ b/extensions/librdkafka/CMakeLists.txt
@@ -53,6 +53,7 @@
                "-DRDKAFKA_BUILD_EXAMPLES=OFF"
                "-DRDKAFKA_BUILD_TESTS=OFF"
                "-DENABLE_LZ4_EXT=OFF"
+               "-DWITH_ZSTD=OFF"
                "-DCMAKE_MODULE_PATH=${CMAKE_MODULE_PATH_PASSTHROUGH_LIST}"
                "-DCMAKE_C_FLAGS=${CURL_C_FLAGS}"
                "-DCMAKE_INSTALL_LIBDIR=lib"
diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
new file mode 100644
index 0000000..8009afa
--- /dev/null
+++ b/extensions/opencv/FrameIO.h
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_FRAMEIO_H
+#define NIFI_MINIFI_CPP_FRAMEIO_H
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace opencv {
+
+class FrameWriteCallback : public OutputStreamCallback {
+  public:
+    explicit FrameWriteCallback(cv::Mat image_mat, std::string image_encoding_)
+    // TODO - Nghia: Check std::move(img_mat).
+        : image_mat_(std::move(image_mat)), image_encoding_(image_encoding_) {
+    }
+    ~FrameWriteCallback() override = default;
+
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+      int64_t ret = 0;
+      imencode(image_encoding_, image_mat_, image_buf_);
+      ret = stream->write(image_buf_.data(), image_buf_.size());
+      return ret;
+    }
+
+  private:
+    std::vector<uchar> image_buf_;
+    cv::Mat image_mat_;
+    std::string image_encoding_;
+};
+
+class FrameReadCallback : public InputStreamCallback {
+  public:
+    explicit FrameReadCallback(cv::Mat &image_mat)
+        : image_mat_(image_mat) {
+    }
+    ~FrameReadCallback() override = default;
+
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+      int64_t ret = 0;
+      image_buf_.resize(stream->getSize());
+      ret = stream->read(image_buf_.data(), static_cast<int>(stream->getSize()));
+      if (ret != stream->getSize()) {
+        throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
+      }
+      image_mat_ = cv::imdecode(image_buf_, -1);
+      return ret;
+    }
+
+  private:
+    std::vector<uchar> image_buf_;
+    cv::Mat &image_mat_;
+};
+
+} /* namespace opencv */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_FRAMEIO_H
diff --git a/extensions/opencv/MotionDetector.cpp b/extensions/opencv/MotionDetector.cpp
new file mode 100644
index 0000000..5234a35
--- /dev/null
+++ b/extensions/opencv/MotionDetector.cpp
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MotionDetector.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property MotionDetector::ImageEncoding(
+    core::PropertyBuilder::createProperty("Image Encoding")
+        ->withDescription("The encoding that should be applied to the output")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({".jpg", ".png"})
+        ->withDefaultValue(".jpg")->build());
+core::Property MotionDetector::MinInterestArea(
+    core::PropertyBuilder::createProperty("Minimum Area")
+        ->withDescription("We only consider the movement regions with area greater than this.")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(100)->build());
+core::Property MotionDetector::Threshold(
+    core::PropertyBuilder::createProperty("Threshold for segmentation")
+        ->withDescription("Pixel greater than this will be white, otherwise black.")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(42)->build());
+core::Property MotionDetector::BackgroundFrame(
+    core::PropertyBuilder::createProperty("Path to background frame")
+        ->withDescription("If not provided then the processor will take the first input frame as background")
+        ->isRequired(true)
+        ->build());
+core::Property MotionDetector::DilateIter(
+    core::PropertyBuilder::createProperty("Dilate iteration")
+        ->withDescription("For image processing, if an object is detected as 2 separate objects, increase this value")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(10)->build());
+
+core::Relationship MotionDetector::Success("success", "Successful to detect motion");
+core::Relationship MotionDetector::Failure("failure", "Failure to detect motion");
+
+void MotionDetector::initialize() {
+  std::set<core::Property> properties;
+  properties.insert(ImageEncoding);
+  properties.insert(MinInterestArea);
+  properties.insert(Threshold);
+  properties.insert(BackgroundFrame);
+  properties.insert(DilateIter);
+  setSupportedProperties(std::move(properties));
+
+  setSupportedRelationships({Success, Failure});
+}
+
+void MotionDetector::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
+                                  const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  std::string value;
+
+  if (context->getProperty(ImageEncoding.getName(), value)) {
+    image_encoding_ = value;
+  }
+
+  if (context->getProperty(MinInterestArea.getName(), value)) {
+    core::Property::StringToInt(value, min_area_);
+  }
+
+  if (context->getProperty(Threshold.getName(), value)) {
+    core::Property::StringToInt(value, threshold_);
+  }
+
+  if (context->getProperty(DilateIter.getName(), value)) {
+    core::Property::StringToInt(value, dil_iter_);
+  }
+
+  if (context->getProperty(BackgroundFrame.getName(), value) && !value.empty()) {
+    bg_img_ = cv::imread(value, cv::IMREAD_GRAYSCALE);
+    double scale = IMG_WIDTH / bg_img_.size().width;
+    cv::resize(bg_img_, bg_img_, cv::Size(0, 0), scale, scale);
+    cv::GaussianBlur(bg_img_, bg_img_, cv::Size(21, 21), 0, 0);
+    bg_img_.convertTo(background_, CV_32F);
+  }
+
+  logger_->log_trace("MotionDetector processor scheduled");
+}
+
+bool MotionDetector::detectAndDraw(cv::Mat &frame) {
+  cv::Mat gray;
+  cv::Mat img_diff, thresh;
+  std::vector<cv::Mat> contours;
+
+  logger_->log_trace("Detect and Draw");
+
+  cv::cvtColor(frame, gray, cv::COLOR_BGR2GRAY);
+  cv::GaussianBlur(gray, gray, cv::Size(21, 21), 0, 0);
+
+  // Get difference between current frame and background
+  logger_->log_trace("Get difference [%d x %d] [%d x %d]", bg_img_.rows, bg_img_.cols, gray.rows, gray.cols);
+  cv::absdiff(gray, bg_img_, img_diff);
+  logger_->log_trace("Apply threshold");
+  cv::threshold(img_diff, thresh, threshold_, 255, cv::THRESH_BINARY);
+  // Image processing.
+  logger_->log_trace("Dilation");
+  cv::dilate(thresh, thresh, cv::Mat(), cv::Point(-1, -1), dil_iter_);
+  cv::findContours(thresh, contours, cv::RETR_EXTERNAL, cv::CHAIN_APPROX_SIMPLE);
+
+  // Finish process
+  logger_->log_debug("Draw contours");
+  bool moved = false;
+  for (const auto &contour : contours) {
+    auto area = cv::contourArea(contour);
+    if (area < min_area_) {
+      continue;
+    }
+    moved = true;
+    cv::Rect bbox = cv::boundingRect(contour);
+    cv::rectangle(frame, bbox.tl(), bbox.br(), cv::Scalar(0, 255, 0), 2, 8, 0);
+  }
+  logger_->log_trace("Updating background");
+  if (!moved) {
+    logger_->log_debug("Not moved");
+    // Adaptive background, update background so that the illumnation does not affect that much.
+    cv::accumulateWeighted(gray, background_, 0.5);
+    cv::convertScaleAbs(background_, bg_img_);
+  }
+  logger_->log_trace("Finish Detect and Draw");
+  return moved;
+}
+
+void MotionDetector::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+                                 const std::shared_ptr<core::ProcessSession> &session) {
+  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
+  if (!lock.owns_lock()) {
+    logger_->log_info("Cannot process due to an unfinished onTrigger");
+    context->yield();
+    return;
+  }
+
+  auto flow_file = session->get();
+  if (flow_file->getSize() == 0) {
+    logger_->log_info("Empty flow file");
+    return;
+  }
+  cv::Mat frame;
+
+  opencv::FrameReadCallback cb(frame);
+  session->read(flow_file, &cb);
+
+  if (frame.empty()) {
+    logger_->log_error("Empty frame.");
+    session->transfer(flow_file, Failure);
+  }
+
+  double scale = IMG_WIDTH / frame.size().width;
+  cv::resize(frame, frame, cv::Size(0, 0), scale, scale);
+
+  if (background_.empty()) {
+    logger_->log_info("Background is missing, update and yield.");
+    cv::cvtColor(frame, bg_img_, cv::COLOR_BGR2GRAY);
+    cv::GaussianBlur(bg_img_, bg_img_, cv::Size(21, 21), 0, 0);
+    bg_img_.convertTo(background_, CV_32F);
+    return;
+  }
+  logger_->log_trace("Start motion detecting");
+
+  auto t = std::time(nullptr);
+  auto tm = *std::localtime(&t);
+  std::ostringstream oss;
+  oss << std::put_time(&tm, "%Y-%m-%d %H-%M-%S");
+  auto filename = oss.str();
+  filename.append(image_encoding_);
+
+  detectAndDraw(frame);
+
+  opencv::FrameWriteCallback write_cb(frame, image_encoding_);
+
+  session->putAttribute(flow_file, "filename", filename);
+
+  session->write(flow_file, &write_cb);
+  session->transfer(flow_file, Success);
+  logger_->log_trace("Finish motion detecting");
+}
+
+void MotionDetector::notifyStop() {
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opencv/MotionDetector.h b/extensions/opencv/MotionDetector.h
new file mode 100644
index 0000000..4acafb8
--- /dev/null
+++ b/extensions/opencv/MotionDetector.h
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MOTIONDETECTOR_H
+#define NIFI_MINIFI_CPP_MOTIONDETECTOR_H
+
+#include <atomic>
+
+#include <core/Resource.h>
+#include <core/Processor.h>
+#include <opencv2/opencv.hpp>
+#include <opencv2/objdetect.hpp>
+#include <opencv2/imgproc.hpp>
+#include "FrameIO.h"
+
+#include <iomanip>
+#include <ctime>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class MotionDetector : public core::Processor {
+
+ public:
+
+  explicit MotionDetector(const std::string &name, utils::Identifier uuid = utils::Identifier())
+      : Processor(name, uuid),
+        logger_(logging::LoggerFactory<MotionDetector>::getLogger()) {
+  }
+
+  static core::Property ImageEncoding;
+  static core::Property MinInterestArea;
+  static core::Property Threshold;
+  static core::Property DilateIter;
+  static core::Property BackgroundFrame;
+
+  static core::Relationship Success;
+  static core::Relationship Failure;
+
+  virtual void initialize(void) override;
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+  void notifyStop() override;
+
+ private:
+
+  bool detectAndDraw(cv::Mat &frame);
+
+  std::shared_ptr<logging::Logger> logger_;
+  std::mutex mutex_;
+  cv::Mat background_;
+  cv::Mat bg_img_;
+  std::string image_encoding_;
+  int min_area_;
+  int threshold_;
+  int dil_iter_;
+
+  // hardcoded width to 500
+  const double IMG_WIDTH = 500.0;
+};
+
+REGISTER_RESOURCE(MotionDetector, "Detect motion from captured images."); // NOLINT
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_MOTIONDETECTOR_H
diff --git a/extensions/opencv/tests/CaptureRTSPFrameTest.cpp b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
index fac070b..9993da3 100644
--- a/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
+++ b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
+#include "CaptureRTSPFrame.h"
+
+#include <uuid/uuid.h>
 #include <map>
 #include <memory>
 #include <fstream>
 #include <utility>
 #include <string>
 #include <set>
-#include <uuid/uuid.h>
 #include <iostream>
 
 #include "FlowFile.h"
@@ -30,7 +32,6 @@
 #include "../../../libminifi/test/TestBase.h"
 #include "FlowController.h"
 #include "core/Processor.h"
-#include "CaptureRTSPFrame.h"
 #include "core/ProcessorNode.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index fc511c0..1f81fec 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -131,6 +131,15 @@
   uint64_t getFlowExpirationDuration() {
     return expired_duration_;
   }
+
+  void setDropEmptyFlowFiles(bool drop) {
+    drop_empty_ = drop;
+  }
+
+  bool getDropEmptyFlowFiles() const {
+    return drop_empty_;
+  }
+
   // Check whether the queue is empty
   bool isEmpty();
   // Check whether the queue is full to apply back pressure
@@ -192,6 +201,7 @@
   std::shared_ptr<core::ContentRepository> content_repo_;
 
  private:
+  bool drop_empty_;
   // Mutex for protection
   std::mutex mutex_;
   // Queued data size
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 1b81f04..b28ee1f 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -146,7 +146,7 @@
     for (auto url : urls) {
       logger_->log_trace("Parsing %s", url);
       std::string host, protocol;
-      int port;
+      int port = -1;
       url = utils::StringUtils::trim(url);
       utils::parse_url(&url, &host, &port, &protocol);
       logger_->log_trace("Parsed -%s- %s %s, %d", url, protocol, host, port);
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h
index 7f9fdd2..ea714e9 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -57,7 +57,9 @@
 
 class LoggerProperties : public Properties {
  public:
-  LoggerProperties() : Properties("Logger properties") {}
+  LoggerProperties()
+      : Properties("Logger properties") {
+  }
   /**
    * Gets all keys that start with the given prefix and do not have a "." after the prefix and "." separator.
    *
@@ -92,13 +94,21 @@
     return logger_configuration;
   }
 
-  void disableLogging(){
+  static std::unique_ptr<LoggerConfiguration> newInstance() {
+    return std::unique_ptr<LoggerConfiguration>(new LoggerConfiguration());
+  }
+
+  void disableLogging() {
     controller_->setEnabled(false);
   }
 
-  void enableLogging(){
-      controller_->setEnabled(true);
-    }
+  void enableLogging() {
+    controller_->setEnabled(true);
+  }
+
+  bool shortenClassNames() const {
+    return shorten_names_;
+  }
   /**
    * (Re)initializes the logging configuation with the given logger properties.
    */
@@ -118,10 +128,11 @@
 
   class LoggerImpl : public Logger {
    public:
-    LoggerImpl(std::string name, std::shared_ptr<LoggerControl> controller, std::shared_ptr<spdlog::logger> delegate)
-        : Logger(delegate,controller),
+    explicit LoggerImpl(const std::string &name, const std::shared_ptr<LoggerControl> &controller, const std::shared_ptr<spdlog::logger> &delegate)
+        : Logger(delegate, controller),
           name(name) {
     }
+
     void set_delegate(std::shared_ptr<spdlog::logger> delegate) {
       std::lock_guard<std::mutex> lock(mutex_);
       delegate_ = delegate;
@@ -137,6 +148,8 @@
   std::mutex mutex;
   std::shared_ptr<LoggerImpl> logger_ = nullptr;
   std::shared_ptr<LoggerControl> controller_;
+  bool shorten_names_;
+
 };
 
 template<typename T>
@@ -151,9 +164,9 @@
   }
 
   static std::shared_ptr<Logger> getAliasedLogger(const std::string &alias) {
-      std::shared_ptr<Logger> logger = LoggerConfiguration::getConfiguration().getLogger(alias);
-      return logger;
-    }
+    std::shared_ptr<Logger> logger = LoggerConfiguration::getConfiguration().getLogger(alias);
+    return logger;
+  }
 };
 
 } /* namespace logging */
diff --git a/libminifi/include/utils/ClassUtils.h b/libminifi/include/utils/ClassUtils.h
new file mode 100644
index 0000000..3d82c43
--- /dev/null
+++ b/libminifi/include/utils/ClassUtils.h
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_CLASSUTILS_H_
+#define LIBMINIFI_INCLUDE_UTILS_CLASSUTILS_H_
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace ClassUtils {
+
+/**
+ * Shortens class names via the canonical representation ( package with name )
+ * @param class_name input class name
+ * @param out output class name that is shortened.
+ * @return true if out has been updated, false otherwise
+ */
+bool shortenClassName(const std::string &class_name, std::string &out);
+
+} /* namespace ClassUtils */
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_UTILS_CLASSUTILS_H_ */
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index d477f73..8cdc3ef 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -356,8 +356,8 @@
 
 extern std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password);
 
-extern void parse_url(std::string *url, std::string *host, int *port, std::string *protocol);
-extern void parse_url(std::string *url, std::string *host, int *port, std::string *protocol, std::string *path, std::string *query);
+extern void parse_url(const std::string *url, std::string *host, int *port, std::string *protocol);
+extern void parse_url(const std::string *url, std::string *host, int *port, std::string *protocol, std::string *path, std::string *query);
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 6ecbebd..b45f73d 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -374,8 +374,7 @@
   NANOSECOND
 };
 
-} /* namespace core */
-
+} /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index f9a8fcb..be03180 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -49,6 +49,7 @@
   max_data_queue_size_ = 0;
   expired_duration_ = 0;
   queued_data_size_ = 0;
+  drop_empty_ = false;
 
   logger_->log_debug("Connection %s created", name_);
 }
@@ -64,6 +65,7 @@
   max_data_queue_size_ = 0;
   expired_duration_ = 0;
   queued_data_size_ = 0;
+  drop_empty_ = false;
 
   logger_->log_debug("Connection %s created", name_);
 }
@@ -83,6 +85,7 @@
   max_data_queue_size_ = 0;
   expired_duration_ = 0;
   queued_data_size_ = 0;
+  drop_empty_ = false;
 
   logger_->log_debug("Connection %s created", name_);
 }
@@ -103,6 +106,7 @@
   max_data_queue_size_ = 0;
   expired_duration_ = 0;
   queued_data_size_ = 0;
+  drop_empty_ = false;
 
   logger_->log_debug("Connection %s created", name_);
 }
@@ -130,6 +134,10 @@
 }
 
 void Connection::put(std::shared_ptr<core::FlowFile> flow) {
+  if (drop_empty_ && flow->getSize() == 0) {
+    logger_->log_info("Dropping empty flow file: %s", flow->getUUIDStr());
+    return;;
+  }
   {
     std::lock_guard<std::mutex> lock(mutex_);
 
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 1842c21..dafa6c5 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -71,7 +71,7 @@
   auto payload = getRootFromPayload(yamlConfigPayload);
   if (!source.empty() && payload != nullptr) {
     std::string host, protocol, path, query, url = source;
-    int port;
+    int port = -1;
     utils::parse_url(&url, &host, &port, &protocol, &path, &query);
 
     std::string flow_id, bucket_id;
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 862baaf..0997f09 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -29,6 +29,7 @@
 
 #include "core/Core.h"
 #include "utils/StringUtils.h"
+#include "utils/ClassUtils.h"
 
 #include "spdlog/spdlog.h"
 #include "spdlog/sinks/stdout_sinks.h"
@@ -63,6 +64,7 @@
 LoggerConfiguration::LoggerConfiguration()
     : root_namespace_(create_default_root()),
       loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
+      shorten_names_(false),
       formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
   controller_ = std::make_shared<LoggerControl>();
   logger_ = std::shared_ptr<LoggerImpl>(
@@ -77,6 +79,15 @@
   if (!logger_properties->get("spdlog.pattern", spdlog_pattern)) {
     spdlog_pattern = spdlog_default_pattern;
   }
+
+  /**
+   * There is no need to shorten names per spdlog sink as this is a per log instance.
+   */
+  std::string shorten_names_str;
+  if (logger_properties->get("spdlog.shorten_names", shorten_names_str)) {
+    utils::StringUtils::StringToBool(shorten_names_str, shorten_names_);
+  }
+
   formatter_ = std::make_shared<spdlog::pattern_formatter>(spdlog_pattern);
   std::map<std::string, std::shared_ptr<spdlog::logger>> spdloggers;
   for (auto const & logger_impl : loggers) {
@@ -100,6 +111,10 @@
   auto haz_clazz = name.find(clazz);
   if (haz_clazz == 0)
     adjusted_name = name.substr(clazz.length(), name.length() - clazz.length());
+  if (shorten_names_) {
+    utils::ClassUtils::shortenClassName(adjusted_name, adjusted_name);
+  }
+
   std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(adjusted_name, controller_, get_logger(logger_, root_namespace_, adjusted_name, formatter_));
   loggers.push_back(result);
   return result;
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 0cd2851..250e110 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -666,6 +666,14 @@
           }
         }
 
+        if (connectionNode["drop empty"]) {
+          std::string strvalue = connectionNode["drop empty"].as<std::string>();
+          bool dropEmpty = false;
+          if (utils::StringUtils::StringToBool(strvalue, dropEmpty)) {
+            connection->setDropEmptyFlowFiles(dropEmpty);
+          }
+        }
+
         if (connection) {
           parent->addConnection(connection);
         }
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 4a700c2..355fc42 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -569,15 +569,15 @@
 
     int16_t resp = send(transactionID, &packet, nullptr, session);
     if (resp == -1) {
-      throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+      throw Exception(SITE2SITE_EXCEPTION, "Send Failed in transaction " + transactionID);
     }
     logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID << " sent bytes length" << payload.length();
 
     if (!confirm(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID);
     }
     if (!complete(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID);
     }
     logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID << " successfully send flow record " << transaction->current_transfers_ << " content bytes " << transaction->_bytes;
   } catch (std::exception &exception) {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 81e8ac3..056fd9e 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -177,7 +177,7 @@
     if (!complete(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID);
     }
-    logger_->log_debug("Site2Site transaction %s successfully send flow record %d, content bytes %llu", transactionID, transaction->total_transfers_, transaction->_bytes);
+    logger_->log_debug("Site2Site transaction %s successfully sent flow record %d, content bytes %llu", transactionID, transaction->total_transfers_, transaction->_bytes);
   } catch (std::exception &exception) {
     if (transaction)
       deleteTransaction(transactionID);
@@ -215,21 +215,24 @@
 
   if (it == known_transactions_.end()) {
     return false;
-  } else {
-    transaction = it->second;
   }
+  transaction = it->second;
 
-  if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && transaction->getDirection() == RECEIVE) {
+
+  if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() &&
+      transaction->getDirection() == RECEIVE) {
     transaction->_state = TRANSACTION_CONFIRMED;
     return true;
   }
 
-  if (transaction->getState() != DATA_EXCHANGED)
+  if (transaction->getState() != DATA_EXCHANGED) {
     return false;
+  }
 
   if (transaction->getDirection() == RECEIVE) {
-    if (transaction->isDataAvailable())
+    if (transaction->isDataAvailable()) {
       return false;
+    }
     // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
     // to peer so that we can verify that the connection is still open. This is a two-phase commit,
     // which helps to prevent the chances of data duplication. Without doing this, we may commit the
@@ -239,7 +242,7 @@
     // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
     uint64_t crcValue = transaction->getCRC();
     std::string crc = std::to_string(crcValue);
-    logger_->log_debug("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID);
+    logger_->log_debug("Site2Site Receive confirm with CRC %llu to transaction %s", crcValue, transactionID);
     ret = writeResponse(transaction, CONFIRM_TRANSACTION, crc);
     if (ret <= 0)
       return false;
@@ -263,8 +266,9 @@
   } else {
     logger_->log_debug("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID);
     ret = writeResponse(transaction, FINISH_TRANSACTION, "FINISH_TRANSACTION");
-    if (ret <= 0)
+    if (ret <= 0) {
       return false;
+    }
     RespondCode code;
     std::string message;
     readResponse(transaction, code, message);
@@ -372,7 +376,7 @@
       transaction->_state = TRANSACTION_COMPLETED;
       return true;
     } else {
-      logger_->log_debug("Site2Site transaction %s send finished", transactionID);
+      logger_->log_debug("Site2Site transaction %s receive finished", transactionID);
       ret = this->writeResponse(transaction, TRANSACTION_FINISHED, "Finished");
       if (ret <= 0) {
         return false;
@@ -404,13 +408,7 @@
 
 int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, const std::shared_ptr<FlowFileRecord> &flowFile, const std::shared_ptr<core::ProcessSession> &session) {
   int ret;
-  std::shared_ptr<Transaction> transaction = NULL;
 
-  if (flowFile && (flowFile->getResourceClaim() == nullptr || !flowFile->getResourceClaim()->exists())) {
-    auto path = flowFile->getResourceClaim() != nullptr ? flowFile->getResourceClaim()->getContentFullPath() : "nullclaim";
-    logger_->log_debug("Claim %s does not exist for FlowFile %s", path, flowFile->getUUIDStr());
-    return -2;
-  }
   if (peer_state_ != READY) {
     bootstrap();
   }
@@ -418,13 +416,13 @@
   if (peer_state_ != READY) {
     return -1;
   }
+
   std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
 
   if (it == known_transactions_.end()) {
     return -1;
-  } else {
-    transaction = it->second;
   }
+  std::shared_ptr<Transaction> transaction = it->second;
 
   if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
     logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID);
@@ -463,12 +461,20 @@
     logger_->log_debug("Site2Site transaction %s send attribute key %s value %s", transactionID, itAttribute->first, itAttribute->second);
   }
 
+  bool flowfile_has_content = (flowFile != nullptr);
+
+  if (flowFile && (flowFile->getResourceClaim() == nullptr || !flowFile->getResourceClaim()->exists())) {
+    auto path = flowFile->getResourceClaim() != nullptr ? flowFile->getResourceClaim()->getContentFullPath() : "nullclaim";
+    logger_->log_debug("Claim %s does not exist for FlowFile %s", path, flowFile->getUUIDStr());
+    flowfile_has_content = false;
+  }
+
   uint64_t len = 0;
-  if (flowFile) {
+  if (flowFile && flowfile_has_content) {
     len = flowFile->getSize();
     ret = transaction->getStream().write(len);
     if (ret != 8) {
-      logger_->log_debug("ret != 8");
+      logger_->log_debug("Failed to write content size!");
       return -1;
     }
     if (flowFile->getSize() > 0) {
@@ -495,10 +501,16 @@
 
     ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len);
     if (ret != (int64_t)len) {
-      logger_->log_debug("ret != len");
+      logger_->log_debug("Failed to write payload size!");
       return -1;
     }
     packet->_size += len;
+  } else if (flowFile && !flowfile_has_content) {
+    ret = transaction->getStream().write(len);  // Indicate zero length
+    if (ret != 8) {
+      logger_->log_debug("Failed to write content size (0)!");
+      return -1;
+    }
   }
 
   transaction->current_transfers_++;
@@ -528,10 +540,10 @@
 
   if (it == known_transactions_.end()) {
     return false;
-  } else {
-    transaction = it->second;
   }
 
+  transaction = it->second;
+
   if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
     logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID);
     return false;
@@ -608,11 +620,11 @@
   }
 
   packet->_size = len;
-  if (len > 0) {
+  if (len > 0 || numAttributes > 0) {
     transaction->current_transfers_++;
     transaction->total_transfers_++;
   } else {
-    logger_->log_debug("Site2Site transaction %s receives attribute ?", transactionID);
+    logger_->log_warn("Site2Site transaction %s empty flow file without attribute", transactionID);
     transaction->_dataAvailable = false;
     eof = true;
     return true;
@@ -661,7 +673,7 @@
       bool eof = false;
 
       if (!receive(transactionID, &packet, eof)) {
-        throw Exception(SITE2SITE_EXCEPTION, "Receive Failed");
+        throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transactionID);
       }
       if (eof) {
         // transaction done
@@ -712,7 +724,6 @@
     }
     logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID << " received flow record " << transfers
                                << ", with content size " << bytes << " bytes";
-
     // we yield the receive if we did not get anything
     if (transfers == 0)
       context->yield();
@@ -733,7 +744,6 @@
   }
 
   deleteTransaction(transactionID);
-
   return true;
 }
 } /* namespace sitetosite */
diff --git a/libminifi/src/utils/ClassUtils.cpp b/libminifi/src/utils/ClassUtils.cpp
new file mode 100644
index 0000000..9ef2c0d
--- /dev/null
+++ b/libminifi/src/utils/ClassUtils.cpp
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ClassUtils.h"
+#include "utils/StringUtils.h"
+#include <iostream>
+#include <string>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+bool ClassUtils::shortenClassName(const std::string &class_name, std::string &out) {
+  std::string class_delim = "::";
+  auto class_split = utils::StringUtils::split(class_name, class_delim);
+  // support . and ::
+  if (class_split.size() <= 1) {
+    if (class_name.find(".") != std::string::npos) {
+      class_delim = ".";
+      class_split = utils::StringUtils::split(class_name, class_delim);
+    } else {
+      // if no update can be performed, return false to let the developer know
+      // this. Out will have no updates
+      return false;
+    }
+  }
+  for (auto &elem : class_split) {
+    if (&elem != &class_split.back() && elem.size() > 1) {
+      elem = elem.substr(0, 1);
+    }
+  }
+
+  out = utils::StringUtils::join(class_delim, class_split);
+  return true;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/libminifi/src/utils/HTTPClient.cpp b/libminifi/src/utils/HTTPClient.cpp
index 35d8f6b..3328aa5 100644
--- a/libminifi/src/utils/HTTPClient.cpp
+++ b/libminifi/src/utils/HTTPClient.cpp
@@ -48,9 +48,9 @@
   return token;
 }
 
-void parse_url(std::string *url, std::string *host, int *port, std::string *protocol) {
-  std::string http("http://");
-  std::string https("https://");
+void parse_url(const std::string *url, std::string *host, int *port, std::string *protocol) {
+  static std::string http("http://");
+  static std::string https("https://");
 
   if (url->compare(0, http.size(), http) == 0)
     *protocol = http;
@@ -76,53 +76,39 @@
       if (portStr.size() > 0) {
         *port = std::stoi(portStr);
       }
+    } else {
+      // In case the host contains no port, the first part is needed only
+      // For eg.: nifi.io/nifi
+      size_t ppos = host->find_first_of("/");
+      if (ppos != std::string::npos) {
+        *host = host->substr(0, ppos);
+      }
     }
   }
 }
 
-void parse_url(std::string *url, std::string *host, int *port, std::string *protocol, std::string *path, std::string *query) {
-  std::string http("http://");
-  std::string https("https://");
+void parse_url(const std::string *url, std::string *host, int *port, std::string *protocol, std::string *path, std::string *query) {
+  int temp_port = -1;
 
-  if (url->compare(0, http.size(), http) == 0)
-    *protocol = http;
+  parse_url(url, host, &temp_port, protocol);
 
-  if (url->compare(0, https.size(), https) == 0)
-    *protocol = https;
+  if (host->empty() || protocol->empty()) {
+    return;
+  }
 
-  if (!protocol->empty()) {
-    size_t pos = url->find_first_of(":", protocol->size());
+  size_t base_len = host->size() + protocol->size();
+  if (temp_port != -1) {
+    *port = temp_port;
+    base_len += std::to_string(temp_port).size() + 1;  // +1 for the :
+  }
 
-    if (pos == std::string::npos) {
-      pos = url->size();
-    }
-    size_t ppos = url->find_first_of("/", protocol->size());
-    if (pos == url->size() && ppos < url->size()) {
-      *host = url->substr(protocol->size(), ppos - protocol->size());
-    } else {
-      if (ppos < url->size())
-        *host = url->substr(protocol->size(), pos - protocol->size());
-      else
-        return;
-    }
-    if (pos < url->size() && (*url)[pos] == ':') {
-      if (ppos == std::string::npos) {
-        ppos = url->size();
-      }
-      std::string portStr(url->substr(pos + 1, ppos - pos - 1));
-      if (portStr.size() > 0) {
-        *port = std::stoi(portStr);
-      }
-    }
+  auto query_loc = url->find_first_of("?", base_len);
 
-    auto query_loc = url->find_first_of("?", ppos);
-
-    if (query_loc < url->size()) {
-      *path = url->substr(ppos + 1, query_loc - ppos - 1);
-      *query = url->substr(query_loc + 1, url->size() - query_loc - 1);
-    } else {
-      *path = url->substr(ppos + 1, url->size() - ppos - 1);
-    }
+  if (query_loc < url->size()) {
+    *path = url->substr(base_len + 1, query_loc - base_len - 1);
+    *query = url->substr(query_loc + 1, url->size() - query_loc - 1);
+  } else {
+    *path = url->substr(base_len + 1, url->size() - base_len - 1);
   }
 }
 
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index f7af0c1..4d7d363 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -46,14 +46,31 @@
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "core/state/nodes/FlowInformation.h"
 #include "properties/Configure.h"
+#include "utils/ClassUtils.h"
 
 class LogTestController {
  public:
+  ~LogTestController() {
+  }
   static LogTestController& getInstance() {
     static LogTestController instance;
     return instance;
   }
 
+  static std::shared_ptr<LogTestController> getInstance(const std::shared_ptr<logging::LoggerProperties> &logger_properties) {
+    static std::map<std::shared_ptr<logging::LoggerProperties>, std::shared_ptr<LogTestController>> map;
+    auto fnd = map.find(logger_properties);
+    if (fnd != std::end(map)) {
+      return fnd->second;
+    } else {
+      // in practice I'd use a derivation here or another paradigm entirely but for the purposes of this test code
+      // having extra overhead is negligible. this is the most readable and least impactful way
+      auto instance = std::shared_ptr<LogTestController>(new LogTestController(logger_properties));
+      map.insert(std::make_pair(logger_properties, instance));
+      return map.find(logger_properties)->second;
+    }
+  }
+
   template<typename T>
   void setTrace() {
     setLevel<T>(spdlog::level::trace);
@@ -84,12 +101,35 @@
     setLevel<T>(spdlog::level::off);
   }
 
+  /**
+   * Most tests use the main logging framework. this addition allows us to have and control variants for the purposes
+   * of changeable test formats
+   */
+  template<typename T>
+  std::shared_ptr<logging::Logger> getLogger() {
+    std::string name = core::getClassName<T>();
+    return config ? config->getLogger(name) : logging::LoggerConfiguration::getConfiguration().getLogger(name);
+  }
+
   template<typename T>
   void setLevel(spdlog::level::level_enum level) {
     logging::LoggerFactory<T>::getLogger();
     std::string name = core::getClassName<T>();
-    modified_loggers.push_back(name);
+    if (config)
+      config->getLogger(name);
+    else
+      logging::LoggerConfiguration::getConfiguration().getLogger(name);
+    modified_loggers.insert(name);
     setLevel(name, level);
+    // also support shortened classnames
+    if (config && config->shortenClassNames()) {
+      std::string adjusted = name;
+      if (utils::ClassUtils::shortenClassName(name, adjusted)) {
+        modified_loggers.insert(name);
+        setLevel(name, level);
+      }
+    }
+
   }
 
   bool contains(const std::string &ending, std::chrono::seconds timeout = std::chrono::seconds(3)) {
@@ -121,7 +161,9 @@
     for (auto const & name : modified_loggers) {
       setLevel(name, spdlog::level::err);
     }
-    modified_loggers = std::vector<std::string>();
+    modified_loggers.clear();
+    if (config)
+      config = std::move(logging::LoggerConfiguration::newInstance());
     resetStream(log_output);
   }
 
@@ -133,7 +175,7 @@
   std::ostringstream log_output;
 
   std::shared_ptr<logging::Logger> logger_;
- private:
+ protected:
   class TestBootstrapLogger : public logging::Logger {
    public:
     TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
@@ -141,22 +183,39 @@
     }
     ;
   };
-  LogTestController() {
-    std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>();
-    logger_properties->set("logger.root", "ERROR,ostream");
-    logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO");
-    logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
+  LogTestController()
+      : LogTestController(nullptr) {
+  }
+
+  explicit LogTestController(const std::shared_ptr<logging::LoggerProperties> &loggerProps) {
+    my_properties_ = loggerProps;
+    bool initMain = false;
+    if (nullptr == my_properties_) {
+      my_properties_ = std::make_shared<logging::LoggerProperties>();
+      initMain = true;
+    }
+    my_properties_->set("logger.root", "ERROR,ostream");
+    my_properties_->set("logger." + core::getClassName<LogTestController>(), "INFO");
+    my_properties_->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
     std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>();
     dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true));
     dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance());
-    logger_properties->add_sink("ostream", dist_sink);
-    logging::LoggerConfiguration::getConfiguration().initialize(logger_properties);
-    logger_ = logging::LoggerFactory<LogTestController>::getLogger();
+    my_properties_->add_sink("ostream", dist_sink);
+    if (initMain) {
+      logging::LoggerConfiguration::getConfiguration().initialize(my_properties_);
+      logger_ = logging::LoggerConfiguration::getConfiguration().getLogger(core::getClassName<LogTestController>());
+    } else {
+      config = std::move(logging::LoggerConfiguration::newInstance());
+      // create for test purposes. most tests use the main logging factory, but this exists to test the logging
+      // framework itself.
+      config->initialize(my_properties_);
+      logger_ = config->getLogger(core::getClassName<LogTestController>());
+    }
+
   }
   LogTestController(LogTestController const&);
   LogTestController& operator=(LogTestController const&);
-  ~LogTestController() {
-  }
+
   ;
 
   void setLevel(const std::string name, spdlog::level::level_enum level) {
@@ -166,9 +225,14 @@
     auto haz_clazz = name.find(clazz);
     if (haz_clazz == 0)
       adjusted_name = name.substr(clazz.length(), name.length() - clazz.length());
+    if (config && config->shortenClassNames()) {
+      utils::ClassUtils::shortenClassName(adjusted_name, adjusted_name);
+    }
     spdlog::get(adjusted_name)->set_level(level);
   }
-  std::vector<std::string> modified_loggers;
+  std::shared_ptr<logging::LoggerProperties> my_properties_;
+  std::unique_ptr<logging::LoggerConfiguration> config;
+  std::set<std::string> modified_loggers;
 };
 
 class TestPlan {
diff --git a/libminifi/test/unit/ClassUtilsTests.cpp b/libminifi/test/unit/ClassUtilsTests.cpp
new file mode 100644
index 0000000..61d888d
--- /dev/null
+++ b/libminifi/test/unit/ClassUtilsTests.cpp
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+#include "utils/ClassUtils.h"
+#include "../TestBase.h"
+
+TEST_CASE("Test ShortNames", "[testcrc1]") {
+  std::string className, adjusted;
+  SECTION("EMPTY") {
+  className = "";
+  adjusted = "";
+  REQUIRE(!utils::ClassUtils::shortenClassName(className, adjusted));
+  REQUIRE(adjusted.empty());
+  }
+
+  SECTION("SINGLE") {
+  className = "Class";
+  adjusted = "";
+  // class name not shortened
+  REQUIRE(!utils::ClassUtils::shortenClassName(className, adjusted));
+  REQUIRE(adjusted.empty());
+  className = "org::Test";
+  adjusted = "";
+  REQUIRE(utils::ClassUtils::shortenClassName(className, adjusted));
+  REQUIRE("o::Test" == adjusted);
+  }
+
+
+
+  SECTION("MULTIPLE") {
+  className = "org::apache::Test";
+  adjusted = "";
+  REQUIRE(utils::ClassUtils::shortenClassName(className, adjusted));
+  REQUIRE("o::a::Test" == adjusted);
+  className = "org.apache.Test";
+  adjusted = "";
+  REQUIRE(utils::ClassUtils::shortenClassName(className, adjusted));
+  REQUIRE("o.a.Test" == adjusted);
+  className = adjusted;
+  adjusted = "";
+  REQUIRE(utils::ClassUtils::shortenClassName(className, adjusted));
+  REQUIRE("o.a.Test" == adjusted);
+  }
+}
diff --git a/libminifi/test/unit/HTTPUtilTests.cpp b/libminifi/test/unit/HTTPUtilTests.cpp
new file mode 100644
index 0000000..e3b7c45
--- /dev/null
+++ b/libminifi/test/unit/HTTPUtilTests.cpp
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <iostream>
+#include "../TestBase.h"
+#include "utils/HTTPClient.h"
+
+TEST_CASE("TestHTTPUtils::simple", "[test parse no port]") {
+  std::string protocol, host;
+  int port = -1;
+  std::string url = "http://nifi.io/nifi";
+  minifi::utils::parse_url(&url, &host, &port, &protocol);
+  REQUIRE(port == -1);
+  REQUIRE(host == "nifi.io");
+  REQUIRE(protocol == "http://");
+}
+
+TEST_CASE("TestHTTPUtils::urlwithport", "[test parse with port]") {
+  std::string protocol, host;
+  int port = -1;
+  std::string url = "https://nifi.somewhere.far.away:321/nifi";
+  minifi::utils::parse_url(&url, &host, &port, &protocol);
+  REQUIRE(port == 321);
+  REQUIRE(host == "nifi.somewhere.far.away");
+  REQUIRE(protocol == "https://");
+}
+
+TEST_CASE("TestHTTPUtils::query", "[test parse query without port]") {
+  std::string protocol, host, path, query;
+  int port = -1;
+  std::string url = "https://nifi.io/nifi/path?what";
+  minifi::utils::parse_url(&url, &host, &port, &protocol, &path, &query);
+  REQUIRE(port == -1);
+  REQUIRE(host == "nifi.io");
+  REQUIRE(protocol == "https://");
+  REQUIRE(path == "nifi/path");
+  REQUIRE(query == "what");
+}
+
+TEST_CASE("TestHTTPUtils::querywithport", "[test parse query with port]") {
+  std::string protocol, host, path, query;
+  int port = -1;
+  std::string url = "http://nifi.io:4321/nifi_path?what_is_love";
+  minifi::utils::parse_url(&url, &host, &port, &protocol, &path, &query);
+  REQUIRE(port == 4321);
+  REQUIRE(host == "nifi.io");
+  REQUIRE(protocol == "http://");
+  REQUIRE(path == "nifi_path");
+  REQUIRE(query == "what_is_love");
+}
+
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index 60a57c3..e9db9a7 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -21,7 +21,7 @@
 #include <vector>
 #include <ctime>
 #include "../TestBase.h"
-
+#include "core/logging/LoggerConfiguration.h"
 
 TEST_CASE("Test log Levels", "[ttl1]") {
   LogTestController::getInstance().setTrace<logging::Logger>();
@@ -73,5 +73,37 @@
   LogTestController::getInstance().reset();
 }
 
-TEST_CASE("Test Demangle template", "[ttl6]") {
+namespace single {
+class TestClass {
+};
+}
+
+class TestClass2 {
+};
+
+TEST_CASE("Test ShortenNames", "[ttl6]") {
+  std::shared_ptr<logging::LoggerProperties> props = std::make_shared<logging::LoggerProperties>();
+
+  props->set("spdlog.shorten_names", "true");
+
+  std::shared_ptr<logging::Logger> logger = LogTestController::getInstance(props)->getLogger<logging::Logger>();
+  logger->log_error("hello %s", "world");
+
+  REQUIRE(true == LogTestController::getInstance(props)->contains("[o::a::n::m::c::l::Logger] [error] hello world"));
+
+  logger = LogTestController::getInstance(props)->getLogger<single::TestClass>();
+  logger->log_error("hello %s", "world");
+
+  REQUIRE(true == LogTestController::getInstance(props)->contains("[s::TestClass] [error] hello world"));
+
+  logger = LogTestController::getInstance(props)->getLogger<TestClass2>();
+  logger->log_error("hello %s", "world");
+
+  REQUIRE(true == LogTestController::getInstance(props)->contains("[TestClass2] [error] hello world"));
+
+  LogTestController::getInstance(props)->reset();
+  LogTestController::getInstance().reset();
+
+  LogTestController::getInstance(props)->reset();
+  LogTestController::getInstance().reset();
 }