MINIFICPP-1002 - Add SourceInitiatedSubscriptionListener processor

Signed-off-by: Daniel Bakai <bakaid@apache.org>

Approved by aboda and szaszm on GH

This closes #629
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f394bd2..2a4dc77 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -480,6 +480,18 @@
     createExtension(SFTP "SFTP EXTENSIONS" "This enables SFTP support" "extensions/sftp" "extensions/sftp/tests")
 endif()
 
+## Openwsman Extesions
+option(ENABLE_OPENWSMAN "Enables the Openwsman extensions." OFF)
+if (ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT DISABLE_CURL)
+	include(BundledLibXml2)
+	use_bundled_libxml2(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
+	list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/libxml2/dummy")
+
+	include(BundledOpenWSMAN)
+	use_bundled_openwsman(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
+
+    createExtension(OPENWSMAN-EXTENSIONS "OPENWSMAN EXTENSIONS" "This enables Openwsman support" "extensions/openwsman")
+endif()
 
 ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
 add_subdirectory(main)
diff --git a/bootstrap.sh b/bootstrap.sh
index 9c96146..ff38708 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -305,6 +305,8 @@
 add_disabled_option SQL_ENABLED ${FALSE} "ENABLE_SQL"
 set_incompatible_with SQL_ENABLED SQLITE_ENABLED
 
+add_disabled_option OPENWSMAN_ENABLED ${FALSE} "ENABLE_OPENWSMAN"
+
 # Since the following extensions have limitations on
 add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE}
 add_dependency BUSTACHE_ENABLED "boost"
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index a13be2e..4ec336c 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -364,6 +364,7 @@
   echo "T. OpenCV Support ..............$(print_feature_status OPENCV_ENABLED)"
   echo "U. OPC-UA Support...............$(print_feature_status OPC_ENABLED)"
   echo "W. SQL Support..................$(print_feature_status SQL_ENABLED)"
+  echo "X. Openwsman Support ...........$(print_feature_status OPENWSMAN_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -385,7 +386,7 @@
 
 read_feature_options(){
   local choice
-  read -p "Enter choice [ A - W or 1-6 ] " choice
+  read -p "Enter choice [ A - X or 1-6 ] " choice
   choice=$(echo ${choice} | tr '[:upper:]' '[:lower:]')
   case $choice in
     a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -412,7 +413,8 @@
 	s) ToggleFeature SFTP_ENABLED ;;
     t) ToggleFeature OPENCV_ENABLED ;;
     u) ToggleFeature OPC_ENABLED ;;
-    w) ToggleFeature SQL_ENABLED ;;	
+    w) ToggleFeature SQL_ENABLED ;;
+    x) ToggleFeature OPENWSMAN_ENABLED ;;
     1) ToggleFeature TESTS_DISABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
@@ -430,7 +432,7 @@
       fi
       ;;
     q) exit 0;;
-    *) echo -e "${RED}Please enter an option A-W or 1-6...${NO_COLOR}" && sleep 2
+    *) echo -e "${RED}Please enter an option A-X or 1-6...${NO_COLOR}" && sleep 2
   esac
 }
 
diff --git a/cmake/BundledBZip2.cmake b/cmake/BundledBZip2.cmake
index 7ad99a6..9c3fb5b 100644
--- a/cmake/BundledBZip2.cmake
+++ b/cmake/BundledBZip2.cmake
@@ -39,7 +39,7 @@
     # Build project
     ExternalProject_Add(
             bzip2-external
-            URL https://sourceware.org/pub/bzip2/bzip2-1.0.8.tar.gz
+            URL https://sourceware.org/pub/bzip2/bzip2-1.0.8.tar.gz http://deb.debian.org/debian/pool/main/b/bzip2/bzip2_1.0.8.orig.tar.gz
             URL_HASH "SHA256=ab5a03176ee106d3f0fa90e381da478ddae405918153cca248e682cd0c4a2269"
             SOURCE_DIR "${BINARY_DIR}/thirdparty/bzip2-src"
             LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
diff --git a/cmake/BundledLibXml2.cmake b/cmake/BundledLibXml2.cmake
new file mode 100644
index 0000000..61ad939
--- /dev/null
+++ b/cmake/BundledLibXml2.cmake
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+function(use_bundled_libxml2 SOURCE_DIR BINARY_DIR)
+    message("Using bundled libxml2")
+
+    # Define patch step
+    if (WIN32)
+        set(PC "${Patch_EXECUTABLE}" -p1 -i "${SOURCE_DIR}/thirdparty/libxml2/libxml2-win.patch")
+    endif()
+
+    # Define byproducts
+    if (WIN32)
+        set(BYPRODUCT "lib/xml2.lib")
+    else()
+        set(BYPRODUCT "lib/libxml2.a")
+    endif()
+
+    # Set build options
+    if (WIN32)
+        set(LIBXML2_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+                "-DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/libxml2-install")
+    endif()
+
+    # Build project
+    set(LIBXML2_URL ftp://xmlsoft.org/libxml2/libxml2-2.9.10.tar.gz https://ftp.osuosl.org/pub/blfs/conglomeration/libxml2/libxml2-2.9.10.tar.gz)
+    set(LIBXML2_URL_HASH "SHA256=aafee193ffb8fe0c82d4afef6ef91972cbaf5feea100edc2f262750611b4be1f")
+
+    if (WIN32)
+        ExternalProject_Add(
+                libxml2-external
+                URL ${LIBXML2_URL}
+                URL_HASH ${LIBXML2_URL_HASH}
+                SOURCE_DIR "${BINARY_DIR}/thirdparty/libxml2-src"
+                LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
+                CMAKE_ARGS ${LIBXML2_CMAKE_ARGS}
+                PATCH_COMMAND ${PC}
+                BUILD_BYPRODUCTS "${BINARY_DIR}/thirdparty/libxml2-install/${BYPRODUCT}"
+                EXCLUDE_FROM_ALL TRUE
+        )
+    else()
+        ExternalProject_Add(
+                libxml2-external
+                URL ${LIBXML2_URL}
+                URL_HASH ${LIBXML2_URL_HASH}
+                BUILD_IN_SOURCE true
+                SOURCE_DIR "${BINARY_DIR}/thirdparty/libxml2-src"
+                BUILD_COMMAND make
+                CMAKE_COMMAND ""
+                UPDATE_COMMAND ""
+                INSTALL_COMMAND make install
+                BUILD_BYPRODUCTS "${BINARY_DIR}/thirdparty/libxml2-install/${BYPRODUCT}"
+                CONFIGURE_COMMAND ""
+                PATCH_COMMAND ./configure --enable-shared=no --enable-static=yes --with-iconv=no --with-zlib=no --with-lzma=no --with-python=no --with-ftp=no --with-http=no --prefix=${BINARY_DIR}/thirdparty/libxml2-install
+                STEP_TARGETS build
+                EXCLUDE_FROM_ALL TRUE
+        )
+    endif()
+
+    # Set variables
+    set(LIBXML2_FOUND "YES" CACHE STRING "" FORCE)
+    set(LIBXML2_INCLUDE_DIR "${BINARY_DIR}/thirdparty/libxml2-install/include/libxml2" CACHE STRING "" FORCE)
+    set(LIBXML2_INCLUDE_DIRS "${LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+    set(LIBXML2_LIBRARY "${BINARY_DIR}/thirdparty/libxml2-install/${BYPRODUCT}" CACHE STRING "" FORCE)
+    set(LIBXML2_LIBRARIES "${LIBXML2_LIBRARY}" CACHE STRING "" FORCE)
+
+    # Set exported variables for FindPackage.cmake
+    set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_LIBXML2_INCLUDE_DIR=${LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+    set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_LIBXML2_LIBRARY=${LIBXML2_LIBRARY}" CACHE STRING "" FORCE)
+
+    # Create imported targets
+    file(MAKE_DIRECTORY ${LIBXML2_INCLUDE_DIR})
+
+    add_library(LibXml2::LibXml2 STATIC IMPORTED)
+    set_target_properties(LibXml2::LibXml2 PROPERTIES IMPORTED_LOCATION "${LIBXML2_LIBRARY}")
+    add_dependencies(LibXml2::LibXml2 libxml2-external)
+    set_property(TARGET LibXml2::LibXml2 APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES "${LIBXML2_INCLUDE_DIR}")
+endfunction(use_bundled_libxml2)
diff --git a/cmake/BundledOpenWSMAN.cmake b/cmake/BundledOpenWSMAN.cmake
new file mode 100644
index 0000000..0392768
--- /dev/null
+++ b/cmake/BundledOpenWSMAN.cmake
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+function(use_bundled_openwsman SOURCE_DIR BINARY_DIR)
+    message("Using bundled openwsman")
+
+    # Define patch step
+    set(PC "${Patch_EXECUTABLE}" -p1 -i "${SOURCE_DIR}/thirdparty/openwsman/openwsman.patch")
+
+    # Define byproducts
+    if (APPLE)
+        set(PREFIX "lib/lib")
+        set(POSTFIX ".a")
+    elseif(WIN32)
+        message(FATAL_ERROR "OpenWSMAN Windows build is not supported")
+    else()
+        if("${CMAKE_SIZEOF_VOID_P}" EQUAL "8")
+            set(PREFIX "lib64/lib")
+        else()
+            set(PREFIX "lib/lib")
+        endif()
+        set(POSTFIX ".a")
+    endif()
+
+    set(BYPRODUCTS
+            "${PREFIX}wsman${POSTFIX}"
+            "${PREFIX}wsman_client${POSTFIX}"
+            "${PREFIX}wsman_curl_client_transport${POSTFIX}"
+            )
+
+    FOREACH(BYPRODUCT ${BYPRODUCTS})
+        LIST(APPEND OPENWSMAN_LIBRARIES_LIST "${BINARY_DIR}/thirdparty/openwsman-install/${BYPRODUCT}")
+    ENDFOREACH(BYPRODUCT)
+
+    # Set build options
+    set(OPENWSMAN_CMAKE_ARGS
+            ${PASSTHROUGH_CMAKE_ARGS}
+            -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
+            "-DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/openwsman-install"
+            -DBUILD_PYTHON=NO
+            -DBUILD_PYTHON3=NO
+            -DBUILD_LIBCIM=NO
+            -DBUILD_EXAMPLES=NO
+            -DBUILD_BINDINGS=NO
+            -DBUILD_RUBY=NO
+            -DBUILD_PERL=NO
+            -DBUILD_JAVA=NO
+            -DBUILD_CSHARP=NO
+            -DBUILD_CUNIT_TESTS=NO
+            -DDISABLE_PLUGINS=YES
+            -DUSE_PAM=NO
+            -DBUILD_TESTS=NO
+            -DDISABLE_SERVER=YES
+            -DBUILD_SHARED_LIBS=NO)
+
+    append_third_party_passthrough_args(OPENWSMAN_CMAKE_ARGS "${OPENWSMAN_CMAKE_ARGS}")
+
+    # Build project
+    ExternalProject_Add(
+            openwsman-external
+            URL "https://github.com/Openwsman/openwsman/archive/v2.6.11.tar.gz"
+            URL_HASH "SHA256=895eaaae62925f9416766ea3e71a5368210e6cfe13b23e4e0422fa0e75c2541c"
+            SOURCE_DIR "${BINARY_DIR}/thirdparty/openwsman-src"
+            LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
+            CMAKE_ARGS ${OPENWSMAN_CMAKE_ARGS}
+            PATCH_COMMAND ${PC}
+            BUILD_BYPRODUCTS "${OPENWSMAN_LIBRARIES_LIST}"
+            EXCLUDE_FROM_ALL TRUE
+    )
+
+    # Set dependencies
+    add_dependencies(openwsman-external LibXml2::LibXml2 OpenSSL::SSL OpenSSL::Crypto CURL::libcurl)
+
+    # Set variables
+    set(OPENWSMAN_FOUND "YES" CACHE STRING "" FORCE)
+    set(OPENWSMAN_INCLUDE_DIR "${BINARY_DIR}/thirdparty/openwsman-src/include" CACHE STRING "" FORCE)
+    set(OPENWSMAN_LIBRARIES "${OPENWSMAN_LIBRARIES_LIST}" CACHE STRING "" FORCE)
+
+    # Create imported targets
+    file(MAKE_DIRECTORY ${OPENWSMAN_INCLUDE_DIR})
+
+    add_library(OpenWSMAN::libwsman_curl_client_transport STATIC IMPORTED)
+    set_target_properties(OpenWSMAN::libwsman_curl_client_transport PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/openwsman-install/${PREFIX}wsman_curl_client_transport${POSTFIX}")
+    add_dependencies(OpenWSMAN::libwsman_curl_client_transport openwsman-external)
+    set_property(TARGET OpenWSMAN::libwsman_curl_client_transport APPEND PROPERTY INTERFACE_LINK_LIBRARIES LibXml2::LibXml2 OpenSSL::SSL OpenSSL::Crypto CURL::libcurl)
+    set_property(TARGET OpenWSMAN::libwsman_curl_client_transport APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${OPENWSMAN_INCLUDE_DIR})
+
+    add_library(OpenWSMAN::libwsman_client STATIC IMPORTED)
+    set_target_properties(OpenWSMAN::libwsman_client PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/openwsman-install/${PREFIX}wsman_client${POSTFIX}")
+    add_dependencies(OpenWSMAN::libwsman_client openwsman-external)
+    set_property(TARGET OpenWSMAN::libwsman_client APPEND PROPERTY INTERFACE_LINK_LIBRARIES LibXml2::LibXml2)
+    set_property(TARGET OpenWSMAN::libwsman_client APPEND PROPERTY INTERFACE_LINK_LIBRARIES OpenWSMAN::libwsman_curl_client_transport)
+    set_property(TARGET OpenWSMAN::libwsman_client APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${OPENWSMAN_INCLUDE_DIR})
+
+    add_library(OpenWSMAN::libwsman STATIC IMPORTED)
+    set_target_properties(OpenWSMAN::libwsman PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/openwsman-install/${PREFIX}wsman${POSTFIX}")
+    add_dependencies(OpenWSMAN::libwsman openwsman-external)
+    set_property(TARGET OpenWSMAN::libwsman APPEND PROPERTY INTERFACE_LINK_LIBRARIES LibXml2::LibXml2)
+    set_property(TARGET OpenWSMAN::libwsman APPEND PROPERTY INTERFACE_LINK_LIBRARIES OpenWSMAN::libwsman_client)
+    set_property(TARGET OpenWSMAN::libwsman APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${OPENWSMAN_INCLUDE_DIR})
+endfunction(use_bundled_openwsman)
diff --git a/cmake/libxml2/dummy/FindLibXml2.cmake b/cmake/libxml2/dummy/FindLibXml2.cmake
new file mode 100644
index 0000000..9c2ce0d
--- /dev/null
+++ b/cmake/libxml2/dummy/FindLibXml2.cmake
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if(NOT LIBXML2_FOUND)
+    set(LIBXML2_FOUND "YES" CACHE STRING "" FORCE)
+    set(LIBXML2_INCLUDE_DIR "${EXPORTED_LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+    set(LIBXML2_INCLUDE_DIRS "${EXPORTED_LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+    set(LIBXML2_LIBRARIES "${EXPORTED_LIBXML2_LIBRARY}" CACHE STRING "" FORCE)
+endif()
+
+if(NOT TARGET LibXml2::LibXml2)
+    add_library(LibXml2::LibXml2 STATIC IMPORTED)
+    set_target_properties(LibXml2::LibXml2 PROPERTIES IMPORTED_LOCATION "${LIBXML2_LIBRARIES}")
+    set_property(TARGET LibXml2::LibXml2 APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES "${LIBXML2_INCLUDE_DIR}")
+endif()
diff --git a/extensions/openwsman/CMakeLists.txt b/extensions/openwsman/CMakeLists.txt
new file mode 100644
index 0000000..a9470d3
--- /dev/null
+++ b/extensions/openwsman/CMakeLists.txt
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES "processors/*.cpp")
+
+add_library(minifi-openwsman STATIC ${SOURCES})
+
+set_property(TARGET minifi-openwsman PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+target_link_libraries(minifi-openwsman ${LIBMINIFI} Threads::Threads)
+target_link_libraries(minifi-openwsman OpenWSMAN::libwsman CIVETWEB::civetweb-cpp CIVETWEB::c-library LibXml2::LibXml2)
+
+SET (OPENWSMAN-EXTENSION minifi-openwsman PARENT_SCOPE)
+register_extension(minifi-openwsman)
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
new file mode 100644
index 0000000..d285b95
--- /dev/null
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -0,0 +1,922 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SourceInitiatedSubscriptionListener.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <unordered_map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <openssl/x509.h>
+extern "C" {
+#include "wsman-api.h"
+#include "wsman-xml-api.h"
+#include "wsman-xml-serialize.h"
+#include "wsman-xml-serializer.h"
+#include "wsman-soap.h"
+#include "wsman-soap-envelope.h"
+}
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
+
+#define XML_NS_CUSTOM_SUBSCRIPTION "http://schemas.microsoft.com/wbem/wsman/1/subscription"
+#define XML_NS_CUSTOM_AUTHENTICATION "http://schemas.microsoft.com/wbem/wsman/1/authentication"
+#define XML_NS_CUSTOM_POLICY "http://schemas.xmlsoap.org/ws/2002/12/policy"
+#define XML_NS_CUSTOM_MACHINEID "http://schemas.microsoft.com/wbem/wsman/1/machineid"
+#define WSMAN_CUSTOM_ACTION_ACK "http://schemas.dmtf.org/wbem/wsman/1/wsman/Ack"
+#define WSMAN_CUSTOM_ACTION_HEARTBEAT "http://schemas.dmtf.org/wbem/wsman/1/wsman/Heartbeat"
+#define WSMAN_CUSTOM_ACTION_EVENTS "http://schemas.dmtf.org/wbem/wsman/1/wsman/Events"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+core::Property SourceInitiatedSubscriptionListener::ListenHostname(
+    core::PropertyBuilder::createProperty("Listen Hostname")->withDescription("The hostname or IP of this machine that will be advertised to event sources to connect to. "
+                                                                              "It must be contained as a Subject Alternative Name in the server certificate, "
+                                                                              "otherwise source machines will refuse to connect.")
+        ->isRequired(true)->build());
+core::Property SourceInitiatedSubscriptionListener::ListenPort(
+    core::PropertyBuilder::createProperty("Listen Port")->withDescription("The port to listen on.")
+        ->isRequired(true)->withDefaultValue<int64_t>(5986, core::StandardValidators::LISTEN_PORT_VALIDATOR())->build());
+core::Property SourceInitiatedSubscriptionListener::SubscriptionManagerPath(
+    core::PropertyBuilder::createProperty("Subscription Manager Path")->withDescription("The URI path that will be used for the WEC Subscription Manager endpoint.")
+        ->isRequired(true)->withDefaultValue("/wsman/SubscriptionManager/WEC")->build());
+core::Property SourceInitiatedSubscriptionListener::SubscriptionsBasePath(
+    core::PropertyBuilder::createProperty("Subscriptions Base Path")->withDescription("The URI path that will be used as the base for endpoints serving individual subscriptions.")
+        ->isRequired(true)->withDefaultValue("/wsman/subscriptions")->build());
+core::Property SourceInitiatedSubscriptionListener::SSLCertificate(
+    core::PropertyBuilder::createProperty("SSL Certificate")->withDescription("File containing PEM-formatted file including TLS/SSL certificate and key. "
+                                                                              "The root CA of the certificate must be the CA set in SSL Certificate Authority.")
+        ->isRequired(true)->build());
+core::Property SourceInitiatedSubscriptionListener::SSLCertificateAuthority(
+    core::PropertyBuilder::createProperty("SSL Certificate Authority")->withDescription("File containing the PEM-formatted CA that is the root CA for both this server's certificate "
+                                                                                        "and the event source clients' certificates.")
+        ->isRequired(true)->build());
+core::Property SourceInitiatedSubscriptionListener::SSLVerifyPeer(
+    core::PropertyBuilder::createProperty("SSL Verify Peer")->withDescription("Whether or not to verify the client's certificate")
+        ->isRequired(false)->withDefaultValue<bool>(true)->build());
+core::Property SourceInitiatedSubscriptionListener::XPathXmlQuery(
+    core::PropertyBuilder::createProperty("XPath XML Query")->withDescription("An XPath Query in structured XML format conforming to the Query Schema described in "
+                                                                              "https://docs.microsoft.com/en-gb/windows/win32/wes/queryschema-schema, "
+                                                                              "see an example here: https://docs.microsoft.com/en-gb/windows/win32/wes/consuming-events")
+        ->isRequired(true)
+        ->withDefaultValue("<QueryList>\n"
+                           "  <Query Id=\"0\">\n"
+                           "    <Select Path=\"Application\">*</Select>\n"
+                           "  </Query>\n"
+                           "</QueryList>\n")->build());
+core::Property SourceInitiatedSubscriptionListener::InitialExistingEventsStrategy(
+    core::PropertyBuilder::createProperty("Initial Existing Events Strategy")->withDescription("Defines the behaviour of the Processor when a new event source connects.\n"
+    "None: will not request existing events\n"
+    "All: will request all existing events matching the query")
+        ->isRequired(true)->withAllowableValues<std::string>({INITIAL_EXISTING_EVENTS_STRATEGY_NONE, INITIAL_EXISTING_EVENTS_STRATEGY_ALL})
+        ->withDefaultValue(INITIAL_EXISTING_EVENTS_STRATEGY_NONE)->build());
+core::Property SourceInitiatedSubscriptionListener::SubscriptionExpirationInterval(
+    core::PropertyBuilder::createProperty("Subscription Expiration Interval")->withDescription("The interval while a subscription is valid without renewal. "
+    "Because in a source-initiated subscription, the collector can not cancel the subscription, "
+    "setting this too large could cause unnecessary load on the source machine. "
+    "Setting this too small causes frequent reenumeration and resubscription which is ineffective.")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("10 min")->build());
+core::Property SourceInitiatedSubscriptionListener::HeartbeatInterval(
+    core::PropertyBuilder::createProperty("Heartbeat Interval")->withDescription("The processor will ask the sources to send heartbeats with this interval.")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SourceInitiatedSubscriptionListener::MaxElements(
+    core::PropertyBuilder::createProperty("Max Elements")->withDescription("The maximum number of events a source will batch together and send at once.")
+        ->isRequired(true)->withDefaultValue<uint32_t>(20U)->build());
+core::Property SourceInitiatedSubscriptionListener::MaxLatency(
+    core::PropertyBuilder::createProperty("Max Latency")->withDescription("The maximum time a source will wait to send new events.")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("10 sec")->build());
+core::Property SourceInitiatedSubscriptionListener::ConnectionRetryInterval(
+    core::PropertyBuilder::createProperty("Connection Retry Interval")->withDescription("The interval with which a source will try to reconnect to the server.")
+        ->withDefaultValue<core::TimePeriodValue>("10 sec")->build());
+core::Property SourceInitiatedSubscriptionListener::ConnectionRetryCount(
+    core::PropertyBuilder::createProperty("Connection Retry Count")->withDescription("The number of connection retries after which a source will consider the subscription expired.")
+        ->withDefaultValue<uint32_t>(5U)->build());
+
+core::Relationship SourceInitiatedSubscriptionListener::Success("success", "All Events are routed to success");
+
+constexpr char const* SourceInitiatedSubscriptionListener::ATTRIBUTE_WEF_REMOTE_MACHINEID;
+constexpr char const* SourceInitiatedSubscriptionListener::ATTRIBUTE_WEF_REMOTE_IP;
+
+constexpr char const* SourceInitiatedSubscriptionListener::INITIAL_EXISTING_EVENTS_STRATEGY_NONE;
+constexpr char const* SourceInitiatedSubscriptionListener::INITIAL_EXISTING_EVENTS_STRATEGY_ALL;
+
+constexpr char const* SourceInitiatedSubscriptionListener::ProcessorName;
+
+SourceInitiatedSubscriptionListener::SourceInitiatedSubscriptionListener(std::string name, utils::Identifier uuid)
+    : Processor(name, uuid)
+    , logger_(logging::LoggerFactory<SourceInitiatedSubscriptionListener>::getLogger())
+    , session_factory_(nullptr)
+    , listen_port_(0U)
+    , subscription_expiration_interval_(0)
+    , heartbeat_interval_(0)
+    , max_elements_(0U)
+    , max_latency_(0)
+    , connection_retry_interval_(0)
+    , connection_retry_count_(0U) {
+}
+
+SourceInitiatedSubscriptionListener::Handler::Handler(SourceInitiatedSubscriptionListener& processor)
+    : processor_(processor) {
+}
+
+SourceInitiatedSubscriptionListener::SubscriberData::SubscriberData()
+    : bookmark_(nullptr)
+    , subscription_(nullptr) {
+}
+
+SourceInitiatedSubscriptionListener::SubscriberData::~SubscriberData() {
+  clearSubscription();
+  clearBookmark();
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::setSubscription(const std::string& subscription_version,
+    WsXmlDocH subscription,
+    const std::string& subscription_endpoint,
+    const std::string& subscription_identifier) {
+  clearSubscription();
+  subscription_version_ = subscription_version;
+  subscription_ = subscription;
+  subscription_endpoint_ = subscription_endpoint;
+  subscription_identifier_ = subscription_identifier;
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::clearSubscription() {
+  subscription_version_.clear();
+  if (subscription_ != nullptr) {
+    ws_xml_destroy_doc(subscription_);
+  }
+  subscription_ = nullptr;
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::setBookmark(WsXmlDocH bookmark) {
+  clearBookmark();
+  bookmark_ = bookmark;
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::clearBookmark() {
+  if (bookmark_ != nullptr) {
+    ws_xml_destroy_doc(bookmark_);
+  }
+  bookmark_ = nullptr;
+}
+
+bool SourceInitiatedSubscriptionListener::persistState() const {
+  std::unordered_map<std::string, std::string> state_map;
+  size_t i = 0U;
+  for (const auto& subscriber : subscribers_) {
+    char* xml_buf = nullptr;
+    int xml_buf_size = 0;
+    ws_xml_dump_memory_enc(subscriber.second.bookmark_, &xml_buf, &xml_buf_size, "UTF-8");
+    state_map.emplace("subscriber." + std::to_string(i) + ".machineid", subscriber.first);
+    state_map.emplace("subscriber." + std::to_string(i) + ".bookmark", std::string(xml_buf, xml_buf_size));
+    i++;
+    ws_xml_free_memory(xml_buf);
+  }
+  return state_manager_->set(state_map);
+}
+
+bool SourceInitiatedSubscriptionListener::loadState() {
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    return false;
+  }
+
+  for (size_t i = 0U;; i++) {
+    std::string machineId;
+    try {
+      machineId = state_map.at("subscriber." + std::to_string(i) + ".machineid");
+    } catch (...) {
+      break;
+    }
+
+    std::string bookmark;
+    try {
+      bookmark = state_map.at("subscriber." + std::to_string(i) + ".bookmark");
+    } catch (...) {
+      logger_->log_error("Bookmark for subscriber \"%s\" is missing, skipping", machineId);
+      continue;
+    }
+
+    WsXmlDocH doc = ws_xml_read_memory(bookmark.data(), bookmark.size(), "UTF-8", 0);
+    if (doc == nullptr) {
+      logger_->log_error("Failed to parse saved bookmark for subscriber \"%s\", skipping", machineId);
+      continue;
+    }
+    subscribers_[machineId].setBookmark(doc);
+  }
+
+  return true;
+}
+
+std::string SourceInitiatedSubscriptionListener::Handler::millisecondsToXsdDuration(int64_t milliseconds) {
+  char buf[1024];
+  snprintf(buf, sizeof(buf), "PT%lld.%03lldS", milliseconds / 1000, milliseconds % 1000);
+  return buf;
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::handlePost(CivetServer* server, struct mg_connection* conn) {
+  const struct mg_request_info* req_info = mg_get_request_info(conn);
+  if (req_info == nullptr) {
+    processor_.logger_->log_error("Failed to get request info");
+    return false;
+  }
+
+  const char* endpoint = req_info->local_uri;
+  if (endpoint == nullptr) {
+    processor_.logger_->log_error("Failed to get called endpoint (local_uri)");
+    return false;
+  }
+  processor_.logger_->log_trace("Endpoint \"%s\" has been called", endpoint);
+
+  for (int i = 0; i < req_info->num_headers; i++) {
+    processor_.logger_->log_trace("Received header \"%s: %s\"", req_info->http_headers[i].name, req_info->http_headers[i].value);
+  }
+
+  const char* content_type = mg_get_header(conn, "Content-Type");
+  if (content_type == nullptr) {
+    processor_.logger_->log_error("Content-Type header missing");
+    return false;
+  }
+
+  std::string charset;
+  const char* charset_begin = strstr(content_type, "charset=");
+  if (charset_begin == nullptr) {
+    processor_.logger_->log_warn("charset missing from Content-Type header, assuming UTF-8");
+    charset = "UTF-8";
+  } else {
+    charset_begin += strlen("charset=");
+    const char* charset_end = strchr(charset_begin, ';');
+    if (charset_end == nullptr) {
+        charset = std::string(charset_begin);
+    } else {
+        charset = std::string(charset_begin, charset_end - charset_begin);
+    }
+  }
+  processor_.logger_->log_trace("charset is \"%s\"", charset.c_str());
+
+  std::vector<uint8_t> raw_data;
+  {
+    std::array<uint8_t, 16384U> buf;
+    int read_bytes;
+    while ((read_bytes = mg_read(conn, buf.data(), buf.size())) > 0) {
+      size_t orig_size = raw_data.size();
+      raw_data.resize(orig_size + read_bytes);
+      memcpy(raw_data.data() + orig_size, buf.data(), read_bytes);
+    }
+  }
+
+  if (raw_data.empty()) {
+    processor_.logger_->log_error("POST body is empty");
+    return false;
+  }
+
+  WsXmlDocH doc = ws_xml_read_memory(reinterpret_cast<char*>(raw_data.data()), raw_data.size(), charset.c_str(), 0);
+
+  if (doc == nullptr) {
+    processor_.logger_->log_error("Failed to parse POST body as XML");
+    return false;
+  }
+
+  {
+    WsXmlNodeH node = ws_xml_get_doc_root(doc);
+    char* xml_buf = nullptr;
+    int xml_buf_size = 0;
+    ws_xml_dump_memory_node_tree_enc(node, &xml_buf, &xml_buf_size, "UTF-8");
+    if (xml_buf != nullptr) {
+        logging::LOG_TRACE(processor_.logger_) << "Received request: \"" << std::string(xml_buf, xml_buf_size) << "\"";
+        ws_xml_free_memory(xml_buf);
+    }
+  }
+
+  if (endpoint == processor_.subscription_manager_path_) {
+    return this->handleSubscriptionManager(conn, endpoint, doc);
+  } else if (strncmp(endpoint, processor_.subscriptions_base_path_.c_str(), processor_.subscriptions_base_path_.length()) == 0) {
+    return this->handleSubscriptions(conn, endpoint, doc);
+  } else {
+    ws_xml_destroy_doc(doc);
+    return false;
+  }
+}
+
+std::string SourceInitiatedSubscriptionListener::Handler::getSoapAction(WsXmlDocH doc) {
+  WsXmlNodeH header = ws_xml_get_soap_header(doc);
+  if (header == nullptr) {
+    return "";
+  }
+  WsXmlNodeH action_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_ADDRESSING, WSA_ACTION);
+  if (action_node == nullptr) {
+    return "";
+  }
+  char* text = ws_xml_get_node_text(action_node);
+  if (text == nullptr) {
+    return "";
+  }
+
+  return text;
+}
+
+std::string SourceInitiatedSubscriptionListener::Handler::getMachineId(WsXmlDocH doc) {
+  WsXmlNodeH header = ws_xml_get_soap_header(doc);
+  if (header == nullptr) {
+    return "";
+  }
+  WsXmlNodeH machineid_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_CUSTOM_MACHINEID, "MachineID");
+  if (machineid_node == nullptr) {
+    return "";
+  }
+  char* text = ws_xml_get_node_text(machineid_node);
+  if (text == nullptr) {
+    return "";
+  }
+
+  return text;
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::isAckRequested(WsXmlDocH doc) {
+  WsXmlNodeH header = ws_xml_get_soap_header(doc);
+  if (header == nullptr) {
+    return false;
+  }
+  WsXmlNodeH ack_requested_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_WS_MAN, WSM_ACKREQUESTED);
+  return ack_requested_node != nullptr;
+}
+
+void SourceInitiatedSubscriptionListener::Handler::sendResponse(struct mg_connection* conn, const std::string& machineId, const std::string& remoteIp, char* xml_buf, size_t xml_buf_size) {
+  logging::LOG_TRACE(processor_.logger_) << "Sending response to " << machineId << " (" << remoteIp << "): \"" << std::string(xml_buf, xml_buf_size) << "\"";
+
+  mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+  mg_printf(conn, "Content-Type: application/soap+xml;charset=UTF-8\r\n");
+  mg_printf(conn, "Authorization: %s\r\n", WSMAN_SECURITY_PROFILE_HTTPS_MUTUAL);
+  mg_printf(conn, "Content-Length: %zu\r\n", xml_buf_size);
+  mg_printf(conn, "\r\n");
+  mg_printf(conn, "%.*s", static_cast<int>(xml_buf_size), xml_buf);
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::handleSubscriptionManager(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request) {
+  utils::ScopeGuard request_guard([&]() {
+      ws_xml_destroy_doc(request);
+  });
+
+  auto action = getSoapAction(request);
+  auto machine_id = getMachineId(request);
+  const struct mg_request_info* req_info = mg_get_request_info(conn);
+  std::string remote_ip = req_info->remote_addr;
+  if (action != ENUM_ACTION_ENUMERATE) {
+    processor_.logger_->log_error("%s called by %s (%s) with unknown Action \"%s\"", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str(), action.c_str());
+    return false;  // TODO(bakaid): generate fault if possible
+  }
+
+  // Create reponse envelope from request
+  WsXmlDocH response = wsman_create_response_envelope(request, nullptr);
+  utils::ScopeGuard response_guard([&]() {
+    ws_xml_destroy_doc(response);
+  });
+
+  // Header
+  WsXmlNodeH response_header = ws_xml_get_soap_header(response);
+  // Header/MessageID
+  utils::Identifier msg_id = utils::IdGenerator::getIdGenerator()->generate();
+  ws_xml_add_child_format(response_header, XML_NS_ADDRESSING, WSA_MESSAGE_ID, "uuid:%s", msg_id.to_string().c_str());
+
+  // Body
+  WsXmlNodeH response_body = ws_xml_get_soap_body(response);
+  // Body/EnumerationResponse
+  WsXmlNodeH enumeration_response = ws_xml_add_child(response_body, XML_NS_ENUMERATION, WSENUM_ENUMERATE_RESP, nullptr);
+  // Body/EnumerationResponse/EnumerationContext
+  ws_xml_add_child(enumeration_response, XML_NS_ENUMERATION, WSENUM_ENUMERATION_CONTEXT, nullptr);
+  // Body/EnumerationResponse/Items
+  WsXmlNodeH enumeration_items = ws_xml_add_child(enumeration_response, XML_NS_WS_MAN, WSENUM_ITEMS, nullptr);
+  // Body/EnumerationResponse/EndOfSequence
+  ws_xml_add_child(enumeration_response, XML_NS_WS_MAN, WSENUM_END_OF_SEQUENCE, nullptr);
+
+  // Body/EnumerationResponse/Items/Subscription
+  WsXmlNodeH subscription = ws_xml_add_child(enumeration_items, nullptr, "Subscription", nullptr);
+  ws_xml_set_ns(subscription, XML_NS_CUSTOM_SUBSCRIPTION, "m");
+
+  // Body/EnumerationResponse/Items/Subscription/Version
+  std::lock_guard<std::mutex> lock(processor_.mutex_);
+  auto it = processor_.subscribers_.find(machine_id);
+
+  std::string subscription_version;
+  if (it != processor_.subscribers_.end() && it->second.subscription_ != nullptr) {
+    subscription_version = it->second.subscription_version_;
+  } else {
+    utils::Identifier id = utils::IdGenerator::getIdGenerator()->generate();
+    subscription_version = id.to_string();
+  }
+  ws_xml_add_child_format(subscription, XML_NS_CUSTOM_SUBSCRIPTION, "Version", "uuid:%s", subscription_version.c_str());
+
+  // Body/EnumerationResponse/Items/Subscription/Envelope
+  std::string subscription_identifier;
+  std::string subscription_endpoint;
+  if (it != processor_.subscribers_.end() && it->second.subscription_ != nullptr) {
+    WsXmlNodeH subscription_node = ws_xml_get_doc_root(it->second.subscription_);
+    ws_xml_copy_node(subscription_node, subscription);
+  } else {
+    WsXmlDocH subscription_doc = ws_xml_create_envelope();
+
+    // Header
+    WsXmlNodeH header = ws_xml_get_soap_header(subscription_doc);
+    WsXmlNodeH node;
+
+    // Header/Action
+    node = ws_xml_add_child(header, XML_NS_ADDRESSING, WSA_ACTION, EVT_ACTION_SUBSCRIBE);
+    ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+    // Header/MessageID
+    utils::Identifier msg_id = utils::IdGenerator::getIdGenerator()->generate();
+    ws_xml_add_child_format(header, XML_NS_ADDRESSING, WSA_MESSAGE_ID, "uuid:%s", msg_id.to_string().c_str());
+
+    // Header/To
+    node = ws_xml_add_child(header, XML_NS_ADDRESSING, WSA_TO, WSA_TO_ANONYMOUS);
+    ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+    // Header/ResourceURI
+    node = ws_xml_add_child(header, XML_NS_WS_MAN, WSM_RESOURCE_URI, "http://schemas.microsoft.com/wbem/wsman/1/windows/EventLog");
+    ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+    // Header/ReplyTo
+    node = ws_xml_add_child(header, XML_NS_ADDRESSING, WSA_REPLY_TO, nullptr);
+    node = ws_xml_add_child(node, XML_NS_ADDRESSING, WSA_ADDRESS, WSA_TO_ANONYMOUS);
+    ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+    // Header/OptionSet
+    WsXmlNodeH option_set = ws_xml_add_child(header, XML_NS_WS_MAN, WSM_OPTION_SET, nullptr);
+    ws_xml_ns_add(option_set, XML_NS_SCHEMA_INSTANCE, XML_NS_SCHEMA_INSTANCE_PREFIX);
+
+    // Header/OptionSet/Option (CDATA)
+    node = ws_xml_add_child(option_set, XML_NS_WS_MAN, WSM_OPTION, nullptr);
+    ws_xml_add_node_attr(node, nullptr, WSM_NAME, "CDATA");
+    ws_xml_add_node_attr(node, XML_NS_SCHEMA_INSTANCE, XML_SCHEMA_NIL, "true");
+
+    // Header/OptionSet/Option (IgnoreChannelError)
+    node = ws_xml_add_child(option_set, XML_NS_WS_MAN, WSM_OPTION, nullptr);
+    ws_xml_add_node_attr(node, nullptr, WSM_NAME, "IgnoreChannelError");
+    ws_xml_add_node_attr(node, XML_NS_SCHEMA_INSTANCE, XML_SCHEMA_NIL, "true");
+
+    // Body
+    WsXmlNodeH body = ws_xml_get_soap_body(subscription_doc);
+    WsXmlNodeH subscribe_node = ws_xml_add_child(body, XML_NS_EVENTING, WSEVENT_SUBSCRIBE, nullptr);
+
+    // Body/Delivery
+    {
+      utils::Identifier id = utils::IdGenerator::getIdGenerator()->generate();
+      subscription_identifier = id.to_string();
+    }
+    {
+      utils::Identifier id = utils::IdGenerator::getIdGenerator()->generate();
+      subscription_endpoint = processor_.subscriptions_base_path_ + "/" + id.to_string();
+    }
+
+    WsXmlNodeH delivery_node = ws_xml_add_child(subscribe_node, XML_NS_EVENTING, WSEVENT_DELIVERY, nullptr);
+    ws_xml_add_node_attr(delivery_node, nullptr, WSEVENT_DELIVERY_MODE, WSEVENT_DELIVERY_MODE_EVENTS);
+
+    // Body/Delivery/Heartbeats
+    ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSM_HEARTBEATS, millisecondsToXsdDuration(processor_.heartbeat_interval_).c_str());
+
+    // Body/Delivery/ConnectionRetry
+    auto connection_retry_node = ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSM_CONNECTIONRETRY, millisecondsToXsdDuration(processor_.connection_retry_interval_).c_str());
+    ws_xml_add_node_attr(connection_retry_node, nullptr, "Total", std::to_string(processor_.connection_retry_count_).c_str());
+
+    // Body/Delivery/NotifyTo and Body/EndTo are the same, so we will use this lambda to recreate the same tree
+    auto apply_endpoint_nodes = [&](WsXmlNodeH target_node) {
+      // ${target_node}/NotifyTo/Address
+      ws_xml_add_child_format(target_node, XML_NS_ADDRESSING, WSA_ADDRESS, "https://%s:%hu%s",
+                              processor_.listen_hostname_.c_str(),
+                              processor_.listen_port_,
+                              subscription_endpoint.c_str());
+      // ${target_node}/ReferenceProperties
+      node = ws_xml_add_child(target_node, XML_NS_ADDRESSING, WSA_REFERENCE_PROPERTIES, nullptr);
+      // ${target_node}/ReferenceProperties/Identifier
+      ws_xml_add_child_format(node, XML_NS_EVENTING, WSEVENT_IDENTIFIER, "%s", subscription_identifier.c_str());
+      // ${target_node}/Policy
+      WsXmlNodeH policy = ws_xml_add_child(target_node, nullptr, "Policy", nullptr);
+      ws_xml_set_ns(policy, XML_NS_CUSTOM_POLICY, "c");
+      ws_xml_ns_add(policy, XML_NS_CUSTOM_AUTHENTICATION, "auth");
+      // ${target_node}/Policy/ExactlyOne
+      WsXmlNodeH exactly_one = ws_xml_add_child(policy, XML_NS_CUSTOM_POLICY, "ExactlyOne", nullptr);
+      // ${target_node}/Policy/ExactlyOne/All
+      WsXmlNodeH all = ws_xml_add_child(exactly_one, XML_NS_CUSTOM_POLICY, "All", nullptr);
+      // ${target_node}/Policy/ExactlyOne/All/Authentication
+      WsXmlNodeH authentication = ws_xml_add_child(all, XML_NS_CUSTOM_AUTHENTICATION, "Authentication", nullptr);
+      ws_xml_add_node_attr(authentication, nullptr, "Profile", WSMAN_SECURITY_PROFILE_HTTPS_MUTUAL);
+      // ${target_node}/Policy/ExactlyOne/All/Authentication/ClientCertificate
+      WsXmlNodeH client_certificate = ws_xml_add_child(authentication, XML_NS_CUSTOM_AUTHENTICATION, "ClientCertificate", nullptr);
+      // ${target_node}/Policy/ExactlyOne/All/Authentication/ClientCertificate/Thumbprint
+      WsXmlNodeH thumbprint = ws_xml_add_child_format(client_certificate, XML_NS_CUSTOM_AUTHENTICATION, "Thumbprint", "%s", processor_.ssl_ca_cert_thumbprint_.c_str());
+      ws_xml_add_node_attr(thumbprint, nullptr, "Role", "issuer");
+    };
+
+    // Body/Delivery/NotifyTo
+    WsXmlNodeH notifyto_node = ws_xml_add_child(delivery_node, XML_NS_EVENTING, WSEVENT_NOTIFY_TO, nullptr);
+    apply_endpoint_nodes(notifyto_node);
+
+    // Body/EndTo
+    WsXmlNodeH endto_node = ws_xml_add_child(subscribe_node, XML_NS_EVENTING, WSEVENT_ENDTO, nullptr);
+    apply_endpoint_nodes(endto_node);
+
+    // Body/MaxElements
+    ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSM_MAX_ELEMENTS, std::to_string(processor_.max_elements_).c_str());
+    // Body/MaxTime
+    ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSENUM_MAX_TIME, millisecondsToXsdDuration(processor_.max_latency_).c_str());
+
+    // Body/Expires
+    ws_xml_add_child(subscribe_node, XML_NS_EVENTING, WSEVENT_EXPIRES, millisecondsToXsdDuration(processor_.subscription_expiration_interval_).c_str());
+
+    // Body/Filter
+    WsXmlNodeH filter_node = ws_xml_add_child(subscribe_node, XML_NS_WS_MAN, WSM_FILTER, processor_.xpath_xml_query_.c_str());
+    // ws_xml_add_node_attr(filter_node, nullptr, "Dialect", "http://schemas.microsoft.com/win/2004/08/events/eventquery");
+
+    // Body/Bookmark
+    if (it != processor_.subscribers_.end() && it->second.bookmark_ != nullptr) {
+      WsXmlNodeH bookmark_node = ws_xml_get_doc_root(it->second.bookmark_);
+      ws_xml_copy_node(bookmark_node, subscribe_node);
+    } else if (processor_.initial_existing_events_strategy_ == INITIAL_EXISTING_EVENTS_STRATEGY_ALL) {
+      ws_xml_add_child(subscribe_node, XML_NS_WS_MAN, WSM_BOOKMARK, "http://schemas.dmtf.org/wbem/wsman/1/wsman/bookmark/earliest");
+    }
+
+    // Body/SendBookmarks
+    ws_xml_add_child(subscribe_node, XML_NS_WS_MAN, WSM_SENDBOOKMARKS, nullptr);
+
+    // Copy the whole Subscription
+    WsXmlNodeH subscription_node = ws_xml_get_doc_root(subscription_doc);
+    ws_xml_copy_node(subscription_node, subscription);
+
+    // Save subscription
+    if (it == processor_.subscribers_.end()) {
+      it = processor_.subscribers_.emplace(machine_id, SubscriberData()).first;
+    }
+    it->second.setSubscription(subscription_version, subscription_doc, subscription_endpoint, subscription_identifier);
+  }
+
+  // Send response
+  char* xml_buf = nullptr;
+  int xml_buf_size = 0;
+  ws_xml_dump_memory_enc(response, &xml_buf, &xml_buf_size, "UTF-8");
+
+  sendResponse(conn, machine_id, req_info->remote_addr, xml_buf, xml_buf_size);
+
+  ws_xml_free_memory(xml_buf);
+
+  return true;
+}
+
+SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char* text)
+    : text_(text) {
+}
+
+int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  return stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_));
+}
+
+int SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNodeH node, void* data) {
+  if (data == nullptr) {
+    return 1;
+  }
+
+  std::shared_ptr<core::ProcessSession> session;
+  std::shared_ptr<logging::Logger> logger;
+  std::string machine_id;
+  std::string remote_ip;
+  std::tie(session, logger, machine_id, remote_ip) = *static_cast<std::tuple<std::shared_ptr<core::ProcessSession>, std::shared_ptr<logging::Logger>, std::string, std::string>*>(data);
+
+  char* text = ws_xml_get_node_text(node);
+  if (text == nullptr) {
+      logger->log_error("Failed to get text for node");
+      return 1;
+  }
+
+  try {
+    logger->log_trace("Found Event");
+    auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+    if (flow_file == nullptr) {
+      logger->log_error("Failed to create FlowFile");
+      return 1;
+    }
+
+    WriteCallback callback(text);
+    session->write(flow_file, &callback);
+
+    session->putAttribute(flow_file, FlowAttributeKey(MIME_TYPE), "application/xml");
+    flow_file->addAttribute(ATTRIBUTE_WEF_REMOTE_MACHINEID, machine_id);
+    flow_file->addAttribute(ATTRIBUTE_WEF_REMOTE_IP, remote_ip);
+
+    session->transfer(flow_file, SourceInitiatedSubscriptionListener::Success);
+    session->commit();
+  } catch (const std::exception& e) {
+    logger->log_error("Caught exception while processing Events: %s", e.what());
+    return 1;
+  } catch (...) {
+    logger->log_error("Caught exception while processing Events");
+    return 1;
+  }
+
+  return 0;
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::handleSubscriptions(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request) {
+  utils::ScopeGuard guard([&]() {
+      ws_xml_destroy_doc(request);
+  });
+  auto action = getSoapAction(request);
+  auto machine_id = getMachineId(request);
+  const struct mg_request_info* req_info = mg_get_request_info(conn);
+  std::string remote_ip = req_info->remote_addr;
+  if (action == EVT_ACTION_SUBEND) {
+    std::lock_guard<std::mutex> lock(processor_.mutex_);
+    auto it = processor_.subscribers_.find(machine_id);
+    if (it != processor_.subscribers_.end()) {
+        processor_.subscribers_.erase(it);
+    }
+    // TODO(bakaid): make sure whether we really have to clean the bookmark as well (based on the fault)
+  } else if (action == WSMAN_CUSTOM_ACTION_HEARTBEAT) {
+    processor_.logger_->log_debug("Received Heartbeat on %s from %s (%s)", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+  } else if (action == WSMAN_CUSTOM_ACTION_EVENTS) {
+    processor_.logger_->log_debug("Received Events on %s from %s (%s)", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+    // Body
+    WsXmlNodeH body = ws_xml_get_soap_body(request);
+    if (body == nullptr) {
+      processor_.logger_->log_error("Received malformed Events request on %s from %s (%s), SOAP Body missing", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+      return false;
+    }
+    // Body/Events
+    WsXmlNodeH events_node = ws_xml_get_child(body, 0 /*index*/, XML_NS_WS_MAN, WSM_EVENTS);
+    if (events_node == nullptr) {
+      processor_.logger_->log_error("Received malformed Events request on %s from %s (%s), Events missing", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+      return false;
+    }
+    const struct mg_request_info* req_info = mg_get_request_info(conn);
+    // Enumare Body/Events/Event nodes
+    auto session = processor_.session_factory_->createSession();
+    std::tuple<std::shared_ptr<core::ProcessSession>, std::shared_ptr<logging::Logger>, std::string, std::string> callback_args =
+        std::forward_as_tuple(session, processor_.logger_, machine_id, remote_ip);
+    int ret = ws_xml_enum_children(events_node, &SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback, &callback_args, 0 /*bRecursive*/);
+    if (ret != 0) {
+      processor_.logger_->log_error("Failed to parse events on %s from %s (%s), rolling back session", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+      session->rollback();
+    }
+    // Header
+    WsXmlNodeH header = ws_xml_get_soap_header(request);
+    // Header/Bookmark
+    WsXmlNodeH bookmark_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_WS_MAN, WSM_BOOKMARK);
+    if (ret == 0 && bookmark_node != nullptr) {
+      WsXmlDocH bookmark_doc = ws_xml_create_doc(XML_NS_WS_MAN, WSM_BOOKMARK);
+      WsXmlNodeH temp = ws_xml_get_doc_root(bookmark_doc);
+      ws_xml_duplicate_children(temp, bookmark_node);
+
+      std::lock_guard<std::mutex> lock(processor_.mutex_);
+      auto it = processor_.subscribers_.find(machine_id);
+      if (it != processor_.subscribers_.end()) {
+        it = processor_.subscribers_.emplace(machine_id, SubscriberData()).first;
+      }
+      it->second.setBookmark(bookmark_doc);
+      // Bookmark changed, invalidate stored subscription
+      it->second.clearSubscription();
+
+      // Persist state
+      processor_.persistState();
+
+      char* xml_buf = nullptr;
+      int xml_buf_size = 0;
+      ws_xml_dump_memory_enc(bookmark_doc, &xml_buf, &xml_buf_size, "UTF-8");
+      processor_.logger_->log_debug("Saved new bookmark for %s: \"%.*s\"", machine_id.c_str(), xml_buf_size, xml_buf);
+      ws_xml_free_memory(xml_buf);
+    }
+  } else {
+    processor_.logger_->log_error("%s called by %s (%s) with unknown Action \"%s\"", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str(), action.c_str());
+    return false;  // TODO(bakaid): generate fault if possible
+  }
+
+  if (isAckRequested(request)) {
+    // Assemble ACK
+    WsXmlDocH ack = wsman_create_response_envelope(request, WSMAN_CUSTOM_ACTION_ACK);
+    // Header
+    WsXmlNodeH ack_header = ws_xml_get_soap_header(ack);
+
+    // Header/MessageID
+    utils::Identifier msg_id = utils::IdGenerator::getIdGenerator()->generate();
+    ws_xml_add_child_format(ack_header, XML_NS_ADDRESSING, WSA_MESSAGE_ID, "uuid:%s", msg_id.to_string().c_str());
+
+    // Send ACK
+    char* xml_buf = nullptr;
+    int xml_buf_size = 0;
+    ws_xml_dump_memory_enc(ack, &xml_buf, &xml_buf_size, "UTF-8");
+
+    sendResponse(conn, machine_id, remote_ip, xml_buf, xml_buf_size);
+
+    ws_xml_free_memory(xml_buf);
+    ws_xml_destroy_doc(ack);
+  }
+
+  return true;
+}
+
+void SourceInitiatedSubscriptionListener::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  logger_->log_trace("SourceInitiatedSubscriptionListener onTrigger called");
+}
+
+void SourceInitiatedSubscriptionListener::initialize() {
+  logger_->log_trace("Initializing SourceInitiatedSubscriptionListener");
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(ListenHostname);
+  properties.insert(ListenPort);
+  properties.insert(SubscriptionManagerPath);
+  properties.insert(SubscriptionsBasePath);
+  properties.insert(SSLCertificate);
+  properties.insert(SSLCertificateAuthority);
+  properties.insert(SSLVerifyPeer);
+  properties.insert(XPathXmlQuery);
+  properties.insert(InitialExistingEventsStrategy);
+  properties.insert(SubscriptionExpirationInterval);
+  properties.insert(HeartbeatInterval);
+  properties.insert(MaxElements);
+  properties.insert(MaxLatency);
+  properties.insert(ConnectionRetryInterval);
+  properties.insert(ConnectionRetryCount);
+  setSupportedProperties(properties);
+
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void SourceInitiatedSubscriptionListener::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  std::string ssl_certificate_file;
+  std::string ssl_ca_file;
+  bool verify_peer = true;
+
+  state_manager_ = context->getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  std::string value;
+  context->getProperty(ListenHostname.getName(), listen_hostname_);
+  if (!context->getProperty(ListenPort.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Listen Port attribute is missing or invalid");
+  } else {
+    core::Property::StringToInt(value, listen_port_);
+  }
+  context->getProperty(SubscriptionManagerPath.getName(), subscription_manager_path_);
+  context->getProperty(SubscriptionsBasePath.getName(), subscriptions_base_path_);
+  if (!context->getProperty(SSLCertificate.getName(), ssl_certificate_file)) {
+    throw Exception(PROCESSOR_EXCEPTION,"SSL Certificate attribute is missing");
+  }
+  if (!context->getProperty(SSLCertificateAuthority.getName(), ssl_ca_file)) {
+    throw Exception(PROCESSOR_EXCEPTION,"SSL Certificate Authority attribute is missing");
+  }
+  if (!context->getProperty(SSLVerifyPeer.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"SSL Verify Peer attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, verify_peer);
+  }
+  context->getProperty(XPathXmlQuery.getName(), xpath_xml_query_);
+  if (!context->getProperty(InitialExistingEventsStrategy.getName(), initial_existing_events_strategy_)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Initial Existing Events Strategy attribute is missing or invalid");
+  }
+  if (!context->getProperty(SubscriptionExpirationInterval.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Subscription Expiration Interval attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, subscription_expiration_interval_, unit) ||
+        !core::Property::ConvertTimeUnitToMS(subscription_expiration_interval_, unit, subscription_expiration_interval_)) {
+      throw Exception(PROCESSOR_EXCEPTION,"Subscription Expiration Interval attribute is invalid");
+    }
+  }
+  if (!context->getProperty(HeartbeatInterval.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Heartbeat Interval attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, heartbeat_interval_, unit) || !core::Property::ConvertTimeUnitToMS(heartbeat_interval_, unit, heartbeat_interval_)) {
+      throw Exception(PROCESSOR_EXCEPTION,"Heartbeat Interval attribute is invalid");
+    }
+  }
+  if (!context->getProperty(MaxElements.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Max Elements attribute is missing or invalid");
+  } else if (!core::Property::StringToInt(value, max_elements_)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Max Elements attribute is invalid");
+  }
+  if (!context->getProperty(MaxLatency.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Max Latency attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, max_latency_, unit) || !core::Property::ConvertTimeUnitToMS(max_latency_, unit, max_latency_)) {
+      throw Exception(PROCESSOR_EXCEPTION,"Max Latency attribute is invalid");
+    }
+  }
+  if (!context->getProperty(ConnectionRetryInterval.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Interval attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, connection_retry_interval_, unit) || !core::Property::ConvertTimeUnitToMS(connection_retry_interval_, unit, connection_retry_interval_)) {
+      throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Interval attribute is invalid");
+    }
+  }
+  if (!context->getProperty(ConnectionRetryCount.getName(), value)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Count attribute is missing or invalid");
+  } else if (!core::Property::StringToInt(value, connection_retry_count_)) {
+    throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Count attribute is invalid");
+  }
+
+  FILE* fp = fopen(ssl_ca_file.c_str(), "rb");
+  if (fp == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION,"Failed to open file specified by SSL Certificate Authority attribute");
+  }
+  X509* ca = nullptr;
+  PEM_read_X509(fp, &ca, nullptr, nullptr);
+  fclose(fp);
+  if (ca == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION,"Failed to parse file specified by SSL Certificate Authority attribute");
+  }
+  std::array<uint8_t, 20U> hash_buf;
+  int ret = X509_digest(ca, EVP_sha1(), hash_buf.data(), nullptr);
+  X509_free(ca);
+  if (ret != 1) {
+    throw Exception(PROCESSOR_EXCEPTION,"Failed to get fingerprint for CA specified by SSL Certificate Authority attribute");
+  }
+  ssl_ca_cert_thumbprint_ = utils::StringUtils::to_hex(hash_buf.data(), hash_buf.size(), true /*uppercase*/);
+  logger_->log_debug("%s SHA-1 thumbprint is %s", ssl_ca_file.c_str(), ssl_ca_cert_thumbprint_.c_str());
+
+  session_factory_ = sessionFactory;
+
+  // Load state
+  loadState();
+
+  // Start server
+  std::vector<std::string> options;
+  options.emplace_back("enable_keep_alive");
+  options.emplace_back("yes");
+  options.emplace_back("keep_alive_timeout_ms");
+  options.emplace_back("15000");
+  options.emplace_back("num_threads");
+  options.emplace_back("1");
+  options.emplace_back("listening_ports");
+  options.emplace_back(std::to_string(listen_port_) + "s");
+  options.emplace_back("ssl_certificate");
+  options.emplace_back(ssl_certificate_file);
+  options.emplace_back("ssl_ca_file");
+  options.emplace_back(ssl_ca_file);
+  options.emplace_back("ssl_verify_peer");
+  options.emplace_back(verify_peer ? "yes" : "no");
+
+  try {
+    server_ = std::unique_ptr<CivetServer>(new CivetServer(options));
+  } catch (const std::exception& e) {
+    throw Exception(PROCESSOR_EXCEPTION, std::string("Failed to initialize server, error: ") + e.what());
+  } catch (...) {
+    throw Exception(PROCESSOR_EXCEPTION,"Failed to initialize server");
+  }
+  handler_ = std::unique_ptr<Handler>(new Handler(*this));
+  server_->addHandler("**", *handler_);
+}
+
+void SourceInitiatedSubscriptionListener::notifyStop() {
+  server_.release();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
new file mode 100644
index 0000000..70ef004
--- /dev/null
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -0,0 +1,173 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
+#define __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include <CivetServer.h>
+extern "C" {
+#include "wsman-xml.h"
+}
+
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class SourceInitiatedSubscriptionListener : public core::Processor {
+ public:
+  static constexpr char const *INITIAL_EXISTING_EVENTS_STRATEGY_NONE = "None";
+  static constexpr char const *INITIAL_EXISTING_EVENTS_STRATEGY_ALL = "All";
+
+  static constexpr char const* ProcessorName = "SourceInitiatedSubscriptionListener";
+
+  explicit SourceInitiatedSubscriptionListener(std::string name, utils::Identifier uuid = utils::Identifier());
+
+  // Supported Properties
+  static core::Property ListenHostname;
+  static core::Property ListenPort;
+  static core::Property SubscriptionManagerPath;
+  static core::Property SubscriptionsBasePath;
+  static core::Property SSLCertificate;
+  static core::Property SSLCertificateAuthority;
+  static core::Property SSLVerifyPeer;
+  static core::Property XPathXmlQuery;
+  static core::Property InitialExistingEventsStrategy;
+  static core::Property SubscriptionExpirationInterval;
+  static core::Property HeartbeatInterval;
+  static core::Property MaxElements;
+  static core::Property MaxLatency;
+  static core::Property ConnectionRetryInterval;
+  static core::Property ConnectionRetryCount;
+
+  // Supported Relationships
+  static core::Relationship Success;
+
+  // Writes Attributes
+  static constexpr char const* ATTRIBUTE_WEF_REMOTE_MACHINEID = "wef.remote.machineid";
+  static constexpr char const* ATTRIBUTE_WEF_REMOTE_IP = "wef.remote.ip";
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void notifyStop() override;
+  
+  class Handler: public CivetHandler {
+   public:
+    explicit Handler(SourceInitiatedSubscriptionListener& processor);
+    bool handlePost(CivetServer* server, struct mg_connection* conn);
+    
+    class WriteCallback : public OutputStreamCallback {
+     public:
+      explicit WriteCallback(char* text);
+      int64_t process(std::shared_ptr<io::BaseStream> stream);
+
+     private:
+      char* text_;
+    };
+
+   private:
+    SourceInitiatedSubscriptionListener& processor_;
+    
+    bool handleSubscriptionManager(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request);
+    bool handleSubscriptions(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request);
+    
+    static int enumerateEventCallback(WsXmlNodeH node, void* data);
+    std::string getSoapAction(WsXmlDocH doc);
+    std::string getMachineId(WsXmlDocH doc);
+    bool isAckRequested(WsXmlDocH doc);
+    void sendResponse(struct mg_connection* conn, const std::string& machineId, const std::string& remoteIp, char* xml_buf, size_t xml_buf_size);
+    
+    static std::string millisecondsToXsdDuration(int64_t milliseconds);
+  };
+
+ protected:
+  std::shared_ptr<logging::Logger> logger_;
+
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
+
+  std::shared_ptr<core::ProcessSessionFactory> session_factory_;
+
+  std::string listen_hostname_;
+  uint16_t listen_port_;
+  std::string subscription_manager_path_;
+  std::string subscriptions_base_path_;
+  std::string ssl_ca_cert_thumbprint_;
+  std::string xpath_xml_query_;
+  std::string initial_existing_events_strategy_;
+  int64_t subscription_expiration_interval_;
+  int64_t heartbeat_interval_;
+  uint32_t max_elements_;
+  int64_t max_latency_;
+  int64_t connection_retry_interval_;
+  uint32_t connection_retry_count_;
+
+  std::unique_ptr<CivetServer> server_;
+  std::unique_ptr<Handler> handler_;
+  
+  struct SubscriberData {
+      WsXmlDocH bookmark_;
+      std::string subscription_version_;
+      WsXmlDocH subscription_;
+      std::string subscription_endpoint_;
+      std::string subscription_identifier_;
+
+      SubscriberData();
+      ~SubscriberData();
+
+      void setSubscription(const std::string& subscription_version, WsXmlDocH subscription, const std::string& subscription_endpoint, const std::string& subscription_identifier);
+      void clearSubscription();
+      void setBookmark(WsXmlDocH bookmark);
+      void clearBookmark();
+  };
+
+  std::mutex mutex_;
+  std::map<std::string /*machineId*/, SubscriberData> subscribers_;
+
+  bool persistState() const;
+  bool loadState();
+};
+
+REGISTER_RESOURCE(SourceInitiatedSubscriptionListener, "This processor implements a Windows Event Forwarding Source Initiated Subscription server with the help of OpenWSMAN. "
+                                                       "Windows hosts can be set up to connect and forward Event Logs to this processor.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif // __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
index 6ac89f2..dd49beb 100644
--- a/extensions/sftp/processors/ListSFTP.cpp
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -794,8 +794,7 @@
     return false;
   }
 
-  size_t i = 0;
-  while (true) {
+  for (size_t i = 0U;; i++) {
     std::string name;
     try {
       name = state_map.at("entity." + std::to_string(i) + ".name");
@@ -812,7 +811,6 @@
       logger_->log_error("State for entity \"%s\" is missing or invalid, skipping", name);
       continue;
     }
-    ++i;
   }
 
   if (state_listing_strategy != listing_strategy_ ||
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 0a52a93..84fc92a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -246,8 +246,7 @@
   std::unordered_map<std::string, std::string> state_map;
   if (state_manager_->get(state_map)) {
     std::map<std::string, TailState> new_tail_states;
-    size_t i = 0;
-    while (true) {
+    for (size_t i = 0U;; i++) {
       std::string name;
       try {
         name = state_map.at("file." + std::to_string(i) + ".name");
@@ -268,7 +267,6 @@
       } catch (...) {
         continue;
       }
-      ++i;
     }
     state_load_success = true;
     tail_states_ = std::move(new_tail_states);
diff --git a/thirdparty/libxml2/libxml2-win.patch b/thirdparty/libxml2/libxml2-win.patch
new file mode 100644
index 0000000..67162bc
--- /dev/null
+++ b/thirdparty/libxml2/libxml2-win.patch
@@ -0,0 +1,126 @@
+diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
+--- orig/CMakeLists.txt	1970-01-01 01:00:00.000000000 +0100
++++ patched/CMakeLists.txt	2020-04-17 14:16:13.000000000 +0200
+@@ -0,0 +1,65 @@
++cmake_minimum_required(VERSION 3.7)
++
++project(libxml2)
++
++set(SOURCES buf.c
++            c14n.c
++            catalog.c
++            chvalid.c
++            debugXML.c
++            dict.c
++            DOCBparser.c
++            encoding.c
++            entities.c
++            error.c
++            globals.c
++            hash.c
++            HTMLparser.c
++            HTMLtree.c
++            legacy.c
++            list.c
++            nanoftp.c
++            nanohttp.c
++            parser.c
++            parserInternals.c
++            pattern.c
++            relaxng.c
++            SAX.c
++            SAX2.c
++            schematron.c
++            threads.c
++            tree.c
++            uri.c
++            valid.c
++            xinclude.c
++            xlink.c
++            xmlcatalog.c
++            xmlIO.c
++            xmlmemory.c
++            xmlmodule.c
++            xmlreader.c
++            xmlregexp.c
++            xmlsave.c
++            xmlschemas.c
++            xmlschemastypes.c
++            xmlstring.c
++            xmlunicode.c
++            xmlwriter.c
++            xpath.c
++            xpointer.c)
++
++add_library(xml2 STATIC ${SOURCES})
++
++set_property(TARGET xml2 PROPERTY POSITION_INDEPENDENT_CODE ON)
++
++target_include_directories(xml2
++                            PRIVATE
++                              include
++                              win32/VC10)
++
++install(TARGETS xml2
++    ARCHIVE DESTINATION lib
++)
++
++install(DIRECTORY include/libxml DESTINATION include/libxml2
++          FILES_MATCHING PATTERN "*.h")
+diff -rupN orig/include/libxml/xmlversion.h patched/include/libxml/xmlversion.h
+--- orig/include/libxml/xmlversion.h	2019-10-30 20:14:29.000000000 +0100
++++ patched/include/libxml/xmlversion.h	2020-04-17 14:52:58.000000000 +0200
+@@ -50,7 +50,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+  *
+  * extra version information, used to show a CVS compilation
+  */
+-#define LIBXML_VERSION_EXTRA "-GITv2.9.10-rc1-2-ga5bb6aaa2"
++#define LIBXML_VERSION_EXTRA ""
+ 
+ /**
+  * LIBXML_TEST_VERSION:
+@@ -171,7 +171,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+  *
+  * Whether the FTP support is configured in
+  */
+-#if 1
++#if 0
+ #define LIBXML_FTP_ENABLED
+ #endif
+ 
+@@ -180,7 +180,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+  *
+  * Whether the HTTP support is configured in
+  */
+-#if 1
++#if 0
+ #define LIBXML_HTTP_ENABLED
+ #endif
+ 
+@@ -270,7 +270,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+  *
+  * Whether iconv support is available
+  */
+-#if 1
++#if 0
+ #define LIBXML_ICONV_ENABLED
+ #endif
+ 
+@@ -395,7 +395,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+  *
+  * Whether the Zlib support is compiled in
+  */
+-#if 1
++#if 0
+ #define LIBXML_ZLIB_ENABLED
+ #endif
+ 
+@@ -404,7 +404,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+  *
+  * Whether the Lzma support is compiled in
+  */
+-#if 1
++#if 0
+ #define LIBXML_LZMA_ENABLED
+ #endif
+ 
diff --git a/thirdparty/openwsman/openwsman.patch b/thirdparty/openwsman/openwsman.patch
new file mode 100644
index 0000000..5fff4b7
--- /dev/null
+++ b/thirdparty/openwsman/openwsman.patch
@@ -0,0 +1,81 @@
+diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
+--- orig/CMakeLists.txt	2019-09-17 11:38:38.000000000 +0200
++++ patched/CMakeLists.txt	2020-04-16 23:43:22.000000000 +0200
+@@ -24,7 +24,7 @@ if(COMMAND cmake_policy)
+ endif(COMMAND cmake_policy)
+ 	      
+ # where to look first for cmake modules, before ${CMAKE_ROOT}/Modules/ is checked
+-SET(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/modules ${CMAKE_MODULE_PATH})
++LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/modules)
+ 
+ INCLUDE( ${CMAKE_SOURCE_DIR}/VERSION.cmake )
+ SET(VERSION "${OPENWSMAN_MAJOR}.${OPENWSMAN_MINOR}.${OPENWSMAN_PATCH}")
+@@ -168,8 +168,14 @@ ENDIF( USE_PAM )
+ 
+ INCLUDE(FindOpenSSL)
+ IF(OPENSSL_FOUND)
++  MESSAGE("OpenSSL found")
++  MESSAGE("OPENSSL_INCLUDE_DIR: ${OPENSSL_INCLUDE_DIR}")
++  MESSAGE("OPENSSL_LIBRARIES: ${OPENSSL_LIBRARIES}")
+   SET(HAVE_SSL 1)
+   SET(USE_OPENSSL 1)
++  INCLUDE_DIRECTORIES(${OPENSSL_INCLUDE_DIR})
++ELSE(OPENSSL_FOUND)
++  SET(HAVE_SSL 0)
+ ENDIF(OPENSSL_FOUND)
+ 
+ IF( BUILD_RUBY )
+@@ -256,6 +262,9 @@ IF(UNIX)
+   IF ( NOT CURL_FOUND)
+    MESSAGE( FATAL_ERROR " curl not found" )
+   ELSE ( NOT CURL_FOUND)
++    MESSAGE("cURL found")
++    MESSAGE("CURL_INCLUDE_DIR: ${CURL_INCLUDE_DIR}")
++    MESSAGE("CURL_LIBRARIES: ${CURL_LIBRARIES}")
+     INCLUDE_DIRECTORIES(${CURL_INCLUDE_DIR})
+     IF(CURL_VERSION_STRING)
+       STRING(COMPARE LESS ${CURL_VERSION_STRING} "7.12.0" result)
+@@ -272,6 +281,9 @@ INCLUDE(FindLibXml2)
+ IF ( NOT LIBXML2_FOUND)
+   MESSAGE( FATAL_ERROR " libxml2 not found" )
+ ELSE ( NOT LIBXML2_FOUND)
++  MESSAGE("Libxml2 found")
++  MESSAGE("LIBXML2_INCLUDE_DIR: ${LIBXML2_INCLUDE_DIR}")
++  MESSAGE("LIBXML2_LIBRARIES: ${LIBXML2_LIBRARIES}")
+   INCLUDE_DIRECTORIES(${LIBXML2_INCLUDE_DIR})
+ ENDIF( NOT LIBXML2_FOUND)
+ 
+diff -rupN orig/src/lib/wsman-soap.c patched/src/lib/wsman-soap.c
+--- orig/src/lib/wsman-soap.c	2019-09-17 11:38:38.000000000 +0200
++++ patched/src/lib/wsman-soap.c	2020-04-16 23:21:49.000000000 +0200
+@@ -991,10 +991,12 @@ unsigned long get_total_enum_context(WsC
+  * preset, hence marking them as weak symbols and testing to see
+  * if they are resolved before using them.
+  */
++#if 0
+ #pragma weak wsmand_options_get_max_threads
+ extern int wsmand_options_get_max_threads(void);
+ #pragma weak wsmand_options_get_max_connections_per_thread
+ extern int wsmand_options_get_max_connections_per_thread(void);
++#endif
+ 
+ /**
+  * Enumeration Stub for processing enumeration requests
+@@ -1030,9 +1032,17 @@ wsenum_enumerate_stub(SoapOpH op,
+         int max_threads = 0;
+         int max_connections_per_thread = 0;
+         int(* fptr)(void);
++#if 0
+         if((fptr = wsmand_options_get_max_threads) != 0){
++#else
++        if(0){
++#endif
+                 max_threads = (* fptr)();
++#if 0
+                 if((fptr = wsmand_options_get_max_connections_per_thread) != 0){
++#else
++                if(0){
++#endif
+                         max_connections_per_thread = (* fptr)();
+                 }
+                 else{