MINIFICPP-1002 - Add SourceInitiatedSubscriptionListener processor
Signed-off-by: Daniel Bakai <bakaid@apache.org>
Approved by aboda and szaszm on GH
This closes #629
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f394bd2..2a4dc77 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -480,6 +480,18 @@
createExtension(SFTP "SFTP EXTENSIONS" "This enables SFTP support" "extensions/sftp" "extensions/sftp/tests")
endif()
+## Openwsman Extesions
+option(ENABLE_OPENWSMAN "Enables the Openwsman extensions." OFF)
+if (ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT DISABLE_CURL)
+ include(BundledLibXml2)
+ use_bundled_libxml2(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
+ list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/libxml2/dummy")
+
+ include(BundledOpenWSMAN)
+ use_bundled_openwsman(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
+
+ createExtension(OPENWSMAN-EXTENSIONS "OPENWSMAN EXTENSIONS" "This enables Openwsman support" "extensions/openwsman")
+endif()
## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
add_subdirectory(main)
diff --git a/bootstrap.sh b/bootstrap.sh
index 9c96146..ff38708 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -305,6 +305,8 @@
add_disabled_option SQL_ENABLED ${FALSE} "ENABLE_SQL"
set_incompatible_with SQL_ENABLED SQLITE_ENABLED
+add_disabled_option OPENWSMAN_ENABLED ${FALSE} "ENABLE_OPENWSMAN"
+
# Since the following extensions have limitations on
add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE}
add_dependency BUSTACHE_ENABLED "boost"
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index a13be2e..4ec336c 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -364,6 +364,7 @@
echo "T. OpenCV Support ..............$(print_feature_status OPENCV_ENABLED)"
echo "U. OPC-UA Support...............$(print_feature_status OPC_ENABLED)"
echo "W. SQL Support..................$(print_feature_status SQL_ENABLED)"
+ echo "X. Openwsman Support ...........$(print_feature_status OPENWSMAN_ENABLED)"
echo "****************************************"
echo " Build Options."
echo "****************************************"
@@ -385,7 +386,7 @@
read_feature_options(){
local choice
- read -p "Enter choice [ A - W or 1-6 ] " choice
+ read -p "Enter choice [ A - X or 1-6 ] " choice
choice=$(echo ${choice} | tr '[:upper:]' '[:lower:]')
case $choice in
a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -412,7 +413,8 @@
s) ToggleFeature SFTP_ENABLED ;;
t) ToggleFeature OPENCV_ENABLED ;;
u) ToggleFeature OPC_ENABLED ;;
- w) ToggleFeature SQL_ENABLED ;;
+ w) ToggleFeature SQL_ENABLED ;;
+ x) ToggleFeature OPENWSMAN_ENABLED ;;
1) ToggleFeature TESTS_DISABLED ;;
2) EnableAllFeatures ;;
3) ToggleFeature JNI_ENABLED;;
@@ -430,7 +432,7 @@
fi
;;
q) exit 0;;
- *) echo -e "${RED}Please enter an option A-W or 1-6...${NO_COLOR}" && sleep 2
+ *) echo -e "${RED}Please enter an option A-X or 1-6...${NO_COLOR}" && sleep 2
esac
}
diff --git a/cmake/BundledBZip2.cmake b/cmake/BundledBZip2.cmake
index 7ad99a6..9c3fb5b 100644
--- a/cmake/BundledBZip2.cmake
+++ b/cmake/BundledBZip2.cmake
@@ -39,7 +39,7 @@
# Build project
ExternalProject_Add(
bzip2-external
- URL https://sourceware.org/pub/bzip2/bzip2-1.0.8.tar.gz
+ URL https://sourceware.org/pub/bzip2/bzip2-1.0.8.tar.gz http://deb.debian.org/debian/pool/main/b/bzip2/bzip2_1.0.8.orig.tar.gz
URL_HASH "SHA256=ab5a03176ee106d3f0fa90e381da478ddae405918153cca248e682cd0c4a2269"
SOURCE_DIR "${BINARY_DIR}/thirdparty/bzip2-src"
LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
diff --git a/cmake/BundledLibXml2.cmake b/cmake/BundledLibXml2.cmake
new file mode 100644
index 0000000..61ad939
--- /dev/null
+++ b/cmake/BundledLibXml2.cmake
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+function(use_bundled_libxml2 SOURCE_DIR BINARY_DIR)
+ message("Using bundled libxml2")
+
+ # Define patch step
+ if (WIN32)
+ set(PC "${Patch_EXECUTABLE}" -p1 -i "${SOURCE_DIR}/thirdparty/libxml2/libxml2-win.patch")
+ endif()
+
+ # Define byproducts
+ if (WIN32)
+ set(BYPRODUCT "lib/xml2.lib")
+ else()
+ set(BYPRODUCT "lib/libxml2.a")
+ endif()
+
+ # Set build options
+ if (WIN32)
+ set(LIBXML2_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+ "-DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/libxml2-install")
+ endif()
+
+ # Build project
+ set(LIBXML2_URL ftp://xmlsoft.org/libxml2/libxml2-2.9.10.tar.gz https://ftp.osuosl.org/pub/blfs/conglomeration/libxml2/libxml2-2.9.10.tar.gz)
+ set(LIBXML2_URL_HASH "SHA256=aafee193ffb8fe0c82d4afef6ef91972cbaf5feea100edc2f262750611b4be1f")
+
+ if (WIN32)
+ ExternalProject_Add(
+ libxml2-external
+ URL ${LIBXML2_URL}
+ URL_HASH ${LIBXML2_URL_HASH}
+ SOURCE_DIR "${BINARY_DIR}/thirdparty/libxml2-src"
+ LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
+ CMAKE_ARGS ${LIBXML2_CMAKE_ARGS}
+ PATCH_COMMAND ${PC}
+ BUILD_BYPRODUCTS "${BINARY_DIR}/thirdparty/libxml2-install/${BYPRODUCT}"
+ EXCLUDE_FROM_ALL TRUE
+ )
+ else()
+ ExternalProject_Add(
+ libxml2-external
+ URL ${LIBXML2_URL}
+ URL_HASH ${LIBXML2_URL_HASH}
+ BUILD_IN_SOURCE true
+ SOURCE_DIR "${BINARY_DIR}/thirdparty/libxml2-src"
+ BUILD_COMMAND make
+ CMAKE_COMMAND ""
+ UPDATE_COMMAND ""
+ INSTALL_COMMAND make install
+ BUILD_BYPRODUCTS "${BINARY_DIR}/thirdparty/libxml2-install/${BYPRODUCT}"
+ CONFIGURE_COMMAND ""
+ PATCH_COMMAND ./configure --enable-shared=no --enable-static=yes --with-iconv=no --with-zlib=no --with-lzma=no --with-python=no --with-ftp=no --with-http=no --prefix=${BINARY_DIR}/thirdparty/libxml2-install
+ STEP_TARGETS build
+ EXCLUDE_FROM_ALL TRUE
+ )
+ endif()
+
+ # Set variables
+ set(LIBXML2_FOUND "YES" CACHE STRING "" FORCE)
+ set(LIBXML2_INCLUDE_DIR "${BINARY_DIR}/thirdparty/libxml2-install/include/libxml2" CACHE STRING "" FORCE)
+ set(LIBXML2_INCLUDE_DIRS "${LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+ set(LIBXML2_LIBRARY "${BINARY_DIR}/thirdparty/libxml2-install/${BYPRODUCT}" CACHE STRING "" FORCE)
+ set(LIBXML2_LIBRARIES "${LIBXML2_LIBRARY}" CACHE STRING "" FORCE)
+
+ # Set exported variables for FindPackage.cmake
+ set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_LIBXML2_INCLUDE_DIR=${LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+ set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_LIBXML2_LIBRARY=${LIBXML2_LIBRARY}" CACHE STRING "" FORCE)
+
+ # Create imported targets
+ file(MAKE_DIRECTORY ${LIBXML2_INCLUDE_DIR})
+
+ add_library(LibXml2::LibXml2 STATIC IMPORTED)
+ set_target_properties(LibXml2::LibXml2 PROPERTIES IMPORTED_LOCATION "${LIBXML2_LIBRARY}")
+ add_dependencies(LibXml2::LibXml2 libxml2-external)
+ set_property(TARGET LibXml2::LibXml2 APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES "${LIBXML2_INCLUDE_DIR}")
+endfunction(use_bundled_libxml2)
diff --git a/cmake/BundledOpenWSMAN.cmake b/cmake/BundledOpenWSMAN.cmake
new file mode 100644
index 0000000..0392768
--- /dev/null
+++ b/cmake/BundledOpenWSMAN.cmake
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+function(use_bundled_openwsman SOURCE_DIR BINARY_DIR)
+ message("Using bundled openwsman")
+
+ # Define patch step
+ set(PC "${Patch_EXECUTABLE}" -p1 -i "${SOURCE_DIR}/thirdparty/openwsman/openwsman.patch")
+
+ # Define byproducts
+ if (APPLE)
+ set(PREFIX "lib/lib")
+ set(POSTFIX ".a")
+ elseif(WIN32)
+ message(FATAL_ERROR "OpenWSMAN Windows build is not supported")
+ else()
+ if("${CMAKE_SIZEOF_VOID_P}" EQUAL "8")
+ set(PREFIX "lib64/lib")
+ else()
+ set(PREFIX "lib/lib")
+ endif()
+ set(POSTFIX ".a")
+ endif()
+
+ set(BYPRODUCTS
+ "${PREFIX}wsman${POSTFIX}"
+ "${PREFIX}wsman_client${POSTFIX}"
+ "${PREFIX}wsman_curl_client_transport${POSTFIX}"
+ )
+
+ FOREACH(BYPRODUCT ${BYPRODUCTS})
+ LIST(APPEND OPENWSMAN_LIBRARIES_LIST "${BINARY_DIR}/thirdparty/openwsman-install/${BYPRODUCT}")
+ ENDFOREACH(BYPRODUCT)
+
+ # Set build options
+ set(OPENWSMAN_CMAKE_ARGS
+ ${PASSTHROUGH_CMAKE_ARGS}
+ -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
+ "-DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/openwsman-install"
+ -DBUILD_PYTHON=NO
+ -DBUILD_PYTHON3=NO
+ -DBUILD_LIBCIM=NO
+ -DBUILD_EXAMPLES=NO
+ -DBUILD_BINDINGS=NO
+ -DBUILD_RUBY=NO
+ -DBUILD_PERL=NO
+ -DBUILD_JAVA=NO
+ -DBUILD_CSHARP=NO
+ -DBUILD_CUNIT_TESTS=NO
+ -DDISABLE_PLUGINS=YES
+ -DUSE_PAM=NO
+ -DBUILD_TESTS=NO
+ -DDISABLE_SERVER=YES
+ -DBUILD_SHARED_LIBS=NO)
+
+ append_third_party_passthrough_args(OPENWSMAN_CMAKE_ARGS "${OPENWSMAN_CMAKE_ARGS}")
+
+ # Build project
+ ExternalProject_Add(
+ openwsman-external
+ URL "https://github.com/Openwsman/openwsman/archive/v2.6.11.tar.gz"
+ URL_HASH "SHA256=895eaaae62925f9416766ea3e71a5368210e6cfe13b23e4e0422fa0e75c2541c"
+ SOURCE_DIR "${BINARY_DIR}/thirdparty/openwsman-src"
+ LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
+ CMAKE_ARGS ${OPENWSMAN_CMAKE_ARGS}
+ PATCH_COMMAND ${PC}
+ BUILD_BYPRODUCTS "${OPENWSMAN_LIBRARIES_LIST}"
+ EXCLUDE_FROM_ALL TRUE
+ )
+
+ # Set dependencies
+ add_dependencies(openwsman-external LibXml2::LibXml2 OpenSSL::SSL OpenSSL::Crypto CURL::libcurl)
+
+ # Set variables
+ set(OPENWSMAN_FOUND "YES" CACHE STRING "" FORCE)
+ set(OPENWSMAN_INCLUDE_DIR "${BINARY_DIR}/thirdparty/openwsman-src/include" CACHE STRING "" FORCE)
+ set(OPENWSMAN_LIBRARIES "${OPENWSMAN_LIBRARIES_LIST}" CACHE STRING "" FORCE)
+
+ # Create imported targets
+ file(MAKE_DIRECTORY ${OPENWSMAN_INCLUDE_DIR})
+
+ add_library(OpenWSMAN::libwsman_curl_client_transport STATIC IMPORTED)
+ set_target_properties(OpenWSMAN::libwsman_curl_client_transport PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/openwsman-install/${PREFIX}wsman_curl_client_transport${POSTFIX}")
+ add_dependencies(OpenWSMAN::libwsman_curl_client_transport openwsman-external)
+ set_property(TARGET OpenWSMAN::libwsman_curl_client_transport APPEND PROPERTY INTERFACE_LINK_LIBRARIES LibXml2::LibXml2 OpenSSL::SSL OpenSSL::Crypto CURL::libcurl)
+ set_property(TARGET OpenWSMAN::libwsman_curl_client_transport APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${OPENWSMAN_INCLUDE_DIR})
+
+ add_library(OpenWSMAN::libwsman_client STATIC IMPORTED)
+ set_target_properties(OpenWSMAN::libwsman_client PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/openwsman-install/${PREFIX}wsman_client${POSTFIX}")
+ add_dependencies(OpenWSMAN::libwsman_client openwsman-external)
+ set_property(TARGET OpenWSMAN::libwsman_client APPEND PROPERTY INTERFACE_LINK_LIBRARIES LibXml2::LibXml2)
+ set_property(TARGET OpenWSMAN::libwsman_client APPEND PROPERTY INTERFACE_LINK_LIBRARIES OpenWSMAN::libwsman_curl_client_transport)
+ set_property(TARGET OpenWSMAN::libwsman_client APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${OPENWSMAN_INCLUDE_DIR})
+
+ add_library(OpenWSMAN::libwsman STATIC IMPORTED)
+ set_target_properties(OpenWSMAN::libwsman PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/openwsman-install/${PREFIX}wsman${POSTFIX}")
+ add_dependencies(OpenWSMAN::libwsman openwsman-external)
+ set_property(TARGET OpenWSMAN::libwsman APPEND PROPERTY INTERFACE_LINK_LIBRARIES LibXml2::LibXml2)
+ set_property(TARGET OpenWSMAN::libwsman APPEND PROPERTY INTERFACE_LINK_LIBRARIES OpenWSMAN::libwsman_client)
+ set_property(TARGET OpenWSMAN::libwsman APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${OPENWSMAN_INCLUDE_DIR})
+endfunction(use_bundled_openwsman)
diff --git a/cmake/libxml2/dummy/FindLibXml2.cmake b/cmake/libxml2/dummy/FindLibXml2.cmake
new file mode 100644
index 0000000..9c2ce0d
--- /dev/null
+++ b/cmake/libxml2/dummy/FindLibXml2.cmake
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if(NOT LIBXML2_FOUND)
+ set(LIBXML2_FOUND "YES" CACHE STRING "" FORCE)
+ set(LIBXML2_INCLUDE_DIR "${EXPORTED_LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+ set(LIBXML2_INCLUDE_DIRS "${EXPORTED_LIBXML2_INCLUDE_DIR}" CACHE STRING "" FORCE)
+ set(LIBXML2_LIBRARIES "${EXPORTED_LIBXML2_LIBRARY}" CACHE STRING "" FORCE)
+endif()
+
+if(NOT TARGET LibXml2::LibXml2)
+ add_library(LibXml2::LibXml2 STATIC IMPORTED)
+ set_target_properties(LibXml2::LibXml2 PROPERTIES IMPORTED_LOCATION "${LIBXML2_LIBRARIES}")
+ set_property(TARGET LibXml2::LibXml2 APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES "${LIBXML2_INCLUDE_DIR}")
+endif()
diff --git a/extensions/openwsman/CMakeLists.txt b/extensions/openwsman/CMakeLists.txt
new file mode 100644
index 0000000..a9470d3
--- /dev/null
+++ b/extensions/openwsman/CMakeLists.txt
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES "processors/*.cpp")
+
+add_library(minifi-openwsman STATIC ${SOURCES})
+
+set_property(TARGET minifi-openwsman PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+target_link_libraries(minifi-openwsman ${LIBMINIFI} Threads::Threads)
+target_link_libraries(minifi-openwsman OpenWSMAN::libwsman CIVETWEB::civetweb-cpp CIVETWEB::c-library LibXml2::LibXml2)
+
+SET (OPENWSMAN-EXTENSION minifi-openwsman PARENT_SCOPE)
+register_extension(minifi-openwsman)
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
new file mode 100644
index 0000000..d285b95
--- /dev/null
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -0,0 +1,922 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SourceInitiatedSubscriptionListener.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <unordered_map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <openssl/x509.h>
+extern "C" {
+#include "wsman-api.h"
+#include "wsman-xml-api.h"
+#include "wsman-xml-serialize.h"
+#include "wsman-xml-serializer.h"
+#include "wsman-soap.h"
+#include "wsman-soap-envelope.h"
+}
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
+
+#define XML_NS_CUSTOM_SUBSCRIPTION "http://schemas.microsoft.com/wbem/wsman/1/subscription"
+#define XML_NS_CUSTOM_AUTHENTICATION "http://schemas.microsoft.com/wbem/wsman/1/authentication"
+#define XML_NS_CUSTOM_POLICY "http://schemas.xmlsoap.org/ws/2002/12/policy"
+#define XML_NS_CUSTOM_MACHINEID "http://schemas.microsoft.com/wbem/wsman/1/machineid"
+#define WSMAN_CUSTOM_ACTION_ACK "http://schemas.dmtf.org/wbem/wsman/1/wsman/Ack"
+#define WSMAN_CUSTOM_ACTION_HEARTBEAT "http://schemas.dmtf.org/wbem/wsman/1/wsman/Heartbeat"
+#define WSMAN_CUSTOM_ACTION_EVENTS "http://schemas.dmtf.org/wbem/wsman/1/wsman/Events"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+core::Property SourceInitiatedSubscriptionListener::ListenHostname(
+ core::PropertyBuilder::createProperty("Listen Hostname")->withDescription("The hostname or IP of this machine that will be advertised to event sources to connect to. "
+ "It must be contained as a Subject Alternative Name in the server certificate, "
+ "otherwise source machines will refuse to connect.")
+ ->isRequired(true)->build());
+core::Property SourceInitiatedSubscriptionListener::ListenPort(
+ core::PropertyBuilder::createProperty("Listen Port")->withDescription("The port to listen on.")
+ ->isRequired(true)->withDefaultValue<int64_t>(5986, core::StandardValidators::LISTEN_PORT_VALIDATOR())->build());
+core::Property SourceInitiatedSubscriptionListener::SubscriptionManagerPath(
+ core::PropertyBuilder::createProperty("Subscription Manager Path")->withDescription("The URI path that will be used for the WEC Subscription Manager endpoint.")
+ ->isRequired(true)->withDefaultValue("/wsman/SubscriptionManager/WEC")->build());
+core::Property SourceInitiatedSubscriptionListener::SubscriptionsBasePath(
+ core::PropertyBuilder::createProperty("Subscriptions Base Path")->withDescription("The URI path that will be used as the base for endpoints serving individual subscriptions.")
+ ->isRequired(true)->withDefaultValue("/wsman/subscriptions")->build());
+core::Property SourceInitiatedSubscriptionListener::SSLCertificate(
+ core::PropertyBuilder::createProperty("SSL Certificate")->withDescription("File containing PEM-formatted file including TLS/SSL certificate and key. "
+ "The root CA of the certificate must be the CA set in SSL Certificate Authority.")
+ ->isRequired(true)->build());
+core::Property SourceInitiatedSubscriptionListener::SSLCertificateAuthority(
+ core::PropertyBuilder::createProperty("SSL Certificate Authority")->withDescription("File containing the PEM-formatted CA that is the root CA for both this server's certificate "
+ "and the event source clients' certificates.")
+ ->isRequired(true)->build());
+core::Property SourceInitiatedSubscriptionListener::SSLVerifyPeer(
+ core::PropertyBuilder::createProperty("SSL Verify Peer")->withDescription("Whether or not to verify the client's certificate")
+ ->isRequired(false)->withDefaultValue<bool>(true)->build());
+core::Property SourceInitiatedSubscriptionListener::XPathXmlQuery(
+ core::PropertyBuilder::createProperty("XPath XML Query")->withDescription("An XPath Query in structured XML format conforming to the Query Schema described in "
+ "https://docs.microsoft.com/en-gb/windows/win32/wes/queryschema-schema, "
+ "see an example here: https://docs.microsoft.com/en-gb/windows/win32/wes/consuming-events")
+ ->isRequired(true)
+ ->withDefaultValue("<QueryList>\n"
+ " <Query Id=\"0\">\n"
+ " <Select Path=\"Application\">*</Select>\n"
+ " </Query>\n"
+ "</QueryList>\n")->build());
+core::Property SourceInitiatedSubscriptionListener::InitialExistingEventsStrategy(
+ core::PropertyBuilder::createProperty("Initial Existing Events Strategy")->withDescription("Defines the behaviour of the Processor when a new event source connects.\n"
+ "None: will not request existing events\n"
+ "All: will request all existing events matching the query")
+ ->isRequired(true)->withAllowableValues<std::string>({INITIAL_EXISTING_EVENTS_STRATEGY_NONE, INITIAL_EXISTING_EVENTS_STRATEGY_ALL})
+ ->withDefaultValue(INITIAL_EXISTING_EVENTS_STRATEGY_NONE)->build());
+core::Property SourceInitiatedSubscriptionListener::SubscriptionExpirationInterval(
+ core::PropertyBuilder::createProperty("Subscription Expiration Interval")->withDescription("The interval while a subscription is valid without renewal. "
+ "Because in a source-initiated subscription, the collector can not cancel the subscription, "
+ "setting this too large could cause unnecessary load on the source machine. "
+ "Setting this too small causes frequent reenumeration and resubscription which is ineffective.")
+ ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("10 min")->build());
+core::Property SourceInitiatedSubscriptionListener::HeartbeatInterval(
+ core::PropertyBuilder::createProperty("Heartbeat Interval")->withDescription("The processor will ask the sources to send heartbeats with this interval.")
+ ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SourceInitiatedSubscriptionListener::MaxElements(
+ core::PropertyBuilder::createProperty("Max Elements")->withDescription("The maximum number of events a source will batch together and send at once.")
+ ->isRequired(true)->withDefaultValue<uint32_t>(20U)->build());
+core::Property SourceInitiatedSubscriptionListener::MaxLatency(
+ core::PropertyBuilder::createProperty("Max Latency")->withDescription("The maximum time a source will wait to send new events.")
+ ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("10 sec")->build());
+core::Property SourceInitiatedSubscriptionListener::ConnectionRetryInterval(
+ core::PropertyBuilder::createProperty("Connection Retry Interval")->withDescription("The interval with which a source will try to reconnect to the server.")
+ ->withDefaultValue<core::TimePeriodValue>("10 sec")->build());
+core::Property SourceInitiatedSubscriptionListener::ConnectionRetryCount(
+ core::PropertyBuilder::createProperty("Connection Retry Count")->withDescription("The number of connection retries after which a source will consider the subscription expired.")
+ ->withDefaultValue<uint32_t>(5U)->build());
+
+core::Relationship SourceInitiatedSubscriptionListener::Success("success", "All Events are routed to success");
+
+constexpr char const* SourceInitiatedSubscriptionListener::ATTRIBUTE_WEF_REMOTE_MACHINEID;
+constexpr char const* SourceInitiatedSubscriptionListener::ATTRIBUTE_WEF_REMOTE_IP;
+
+constexpr char const* SourceInitiatedSubscriptionListener::INITIAL_EXISTING_EVENTS_STRATEGY_NONE;
+constexpr char const* SourceInitiatedSubscriptionListener::INITIAL_EXISTING_EVENTS_STRATEGY_ALL;
+
+constexpr char const* SourceInitiatedSubscriptionListener::ProcessorName;
+
+SourceInitiatedSubscriptionListener::SourceInitiatedSubscriptionListener(std::string name, utils::Identifier uuid)
+ : Processor(name, uuid)
+ , logger_(logging::LoggerFactory<SourceInitiatedSubscriptionListener>::getLogger())
+ , session_factory_(nullptr)
+ , listen_port_(0U)
+ , subscription_expiration_interval_(0)
+ , heartbeat_interval_(0)
+ , max_elements_(0U)
+ , max_latency_(0)
+ , connection_retry_interval_(0)
+ , connection_retry_count_(0U) {
+}
+
+SourceInitiatedSubscriptionListener::Handler::Handler(SourceInitiatedSubscriptionListener& processor)
+ : processor_(processor) {
+}
+
+SourceInitiatedSubscriptionListener::SubscriberData::SubscriberData()
+ : bookmark_(nullptr)
+ , subscription_(nullptr) {
+}
+
+SourceInitiatedSubscriptionListener::SubscriberData::~SubscriberData() {
+ clearSubscription();
+ clearBookmark();
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::setSubscription(const std::string& subscription_version,
+ WsXmlDocH subscription,
+ const std::string& subscription_endpoint,
+ const std::string& subscription_identifier) {
+ clearSubscription();
+ subscription_version_ = subscription_version;
+ subscription_ = subscription;
+ subscription_endpoint_ = subscription_endpoint;
+ subscription_identifier_ = subscription_identifier;
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::clearSubscription() {
+ subscription_version_.clear();
+ if (subscription_ != nullptr) {
+ ws_xml_destroy_doc(subscription_);
+ }
+ subscription_ = nullptr;
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::setBookmark(WsXmlDocH bookmark) {
+ clearBookmark();
+ bookmark_ = bookmark;
+}
+
+void SourceInitiatedSubscriptionListener::SubscriberData::clearBookmark() {
+ if (bookmark_ != nullptr) {
+ ws_xml_destroy_doc(bookmark_);
+ }
+ bookmark_ = nullptr;
+}
+
+bool SourceInitiatedSubscriptionListener::persistState() const {
+ std::unordered_map<std::string, std::string> state_map;
+ size_t i = 0U;
+ for (const auto& subscriber : subscribers_) {
+ char* xml_buf = nullptr;
+ int xml_buf_size = 0;
+ ws_xml_dump_memory_enc(subscriber.second.bookmark_, &xml_buf, &xml_buf_size, "UTF-8");
+ state_map.emplace("subscriber." + std::to_string(i) + ".machineid", subscriber.first);
+ state_map.emplace("subscriber." + std::to_string(i) + ".bookmark", std::string(xml_buf, xml_buf_size));
+ i++;
+ ws_xml_free_memory(xml_buf);
+ }
+ return state_manager_->set(state_map);
+}
+
+bool SourceInitiatedSubscriptionListener::loadState() {
+ std::unordered_map<std::string, std::string> state_map;
+ if (!state_manager_->get(state_map)) {
+ return false;
+ }
+
+ for (size_t i = 0U;; i++) {
+ std::string machineId;
+ try {
+ machineId = state_map.at("subscriber." + std::to_string(i) + ".machineid");
+ } catch (...) {
+ break;
+ }
+
+ std::string bookmark;
+ try {
+ bookmark = state_map.at("subscriber." + std::to_string(i) + ".bookmark");
+ } catch (...) {
+ logger_->log_error("Bookmark for subscriber \"%s\" is missing, skipping", machineId);
+ continue;
+ }
+
+ WsXmlDocH doc = ws_xml_read_memory(bookmark.data(), bookmark.size(), "UTF-8", 0);
+ if (doc == nullptr) {
+ logger_->log_error("Failed to parse saved bookmark for subscriber \"%s\", skipping", machineId);
+ continue;
+ }
+ subscribers_[machineId].setBookmark(doc);
+ }
+
+ return true;
+}
+
+std::string SourceInitiatedSubscriptionListener::Handler::millisecondsToXsdDuration(int64_t milliseconds) {
+ char buf[1024];
+ snprintf(buf, sizeof(buf), "PT%lld.%03lldS", milliseconds / 1000, milliseconds % 1000);
+ return buf;
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::handlePost(CivetServer* server, struct mg_connection* conn) {
+ const struct mg_request_info* req_info = mg_get_request_info(conn);
+ if (req_info == nullptr) {
+ processor_.logger_->log_error("Failed to get request info");
+ return false;
+ }
+
+ const char* endpoint = req_info->local_uri;
+ if (endpoint == nullptr) {
+ processor_.logger_->log_error("Failed to get called endpoint (local_uri)");
+ return false;
+ }
+ processor_.logger_->log_trace("Endpoint \"%s\" has been called", endpoint);
+
+ for (int i = 0; i < req_info->num_headers; i++) {
+ processor_.logger_->log_trace("Received header \"%s: %s\"", req_info->http_headers[i].name, req_info->http_headers[i].value);
+ }
+
+ const char* content_type = mg_get_header(conn, "Content-Type");
+ if (content_type == nullptr) {
+ processor_.logger_->log_error("Content-Type header missing");
+ return false;
+ }
+
+ std::string charset;
+ const char* charset_begin = strstr(content_type, "charset=");
+ if (charset_begin == nullptr) {
+ processor_.logger_->log_warn("charset missing from Content-Type header, assuming UTF-8");
+ charset = "UTF-8";
+ } else {
+ charset_begin += strlen("charset=");
+ const char* charset_end = strchr(charset_begin, ';');
+ if (charset_end == nullptr) {
+ charset = std::string(charset_begin);
+ } else {
+ charset = std::string(charset_begin, charset_end - charset_begin);
+ }
+ }
+ processor_.logger_->log_trace("charset is \"%s\"", charset.c_str());
+
+ std::vector<uint8_t> raw_data;
+ {
+ std::array<uint8_t, 16384U> buf;
+ int read_bytes;
+ while ((read_bytes = mg_read(conn, buf.data(), buf.size())) > 0) {
+ size_t orig_size = raw_data.size();
+ raw_data.resize(orig_size + read_bytes);
+ memcpy(raw_data.data() + orig_size, buf.data(), read_bytes);
+ }
+ }
+
+ if (raw_data.empty()) {
+ processor_.logger_->log_error("POST body is empty");
+ return false;
+ }
+
+ WsXmlDocH doc = ws_xml_read_memory(reinterpret_cast<char*>(raw_data.data()), raw_data.size(), charset.c_str(), 0);
+
+ if (doc == nullptr) {
+ processor_.logger_->log_error("Failed to parse POST body as XML");
+ return false;
+ }
+
+ {
+ WsXmlNodeH node = ws_xml_get_doc_root(doc);
+ char* xml_buf = nullptr;
+ int xml_buf_size = 0;
+ ws_xml_dump_memory_node_tree_enc(node, &xml_buf, &xml_buf_size, "UTF-8");
+ if (xml_buf != nullptr) {
+ logging::LOG_TRACE(processor_.logger_) << "Received request: \"" << std::string(xml_buf, xml_buf_size) << "\"";
+ ws_xml_free_memory(xml_buf);
+ }
+ }
+
+ if (endpoint == processor_.subscription_manager_path_) {
+ return this->handleSubscriptionManager(conn, endpoint, doc);
+ } else if (strncmp(endpoint, processor_.subscriptions_base_path_.c_str(), processor_.subscriptions_base_path_.length()) == 0) {
+ return this->handleSubscriptions(conn, endpoint, doc);
+ } else {
+ ws_xml_destroy_doc(doc);
+ return false;
+ }
+}
+
+std::string SourceInitiatedSubscriptionListener::Handler::getSoapAction(WsXmlDocH doc) {
+ WsXmlNodeH header = ws_xml_get_soap_header(doc);
+ if (header == nullptr) {
+ return "";
+ }
+ WsXmlNodeH action_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_ADDRESSING, WSA_ACTION);
+ if (action_node == nullptr) {
+ return "";
+ }
+ char* text = ws_xml_get_node_text(action_node);
+ if (text == nullptr) {
+ return "";
+ }
+
+ return text;
+}
+
+std::string SourceInitiatedSubscriptionListener::Handler::getMachineId(WsXmlDocH doc) {
+ WsXmlNodeH header = ws_xml_get_soap_header(doc);
+ if (header == nullptr) {
+ return "";
+ }
+ WsXmlNodeH machineid_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_CUSTOM_MACHINEID, "MachineID");
+ if (machineid_node == nullptr) {
+ return "";
+ }
+ char* text = ws_xml_get_node_text(machineid_node);
+ if (text == nullptr) {
+ return "";
+ }
+
+ return text;
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::isAckRequested(WsXmlDocH doc) {
+ WsXmlNodeH header = ws_xml_get_soap_header(doc);
+ if (header == nullptr) {
+ return false;
+ }
+ WsXmlNodeH ack_requested_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_WS_MAN, WSM_ACKREQUESTED);
+ return ack_requested_node != nullptr;
+}
+
+void SourceInitiatedSubscriptionListener::Handler::sendResponse(struct mg_connection* conn, const std::string& machineId, const std::string& remoteIp, char* xml_buf, size_t xml_buf_size) {
+ logging::LOG_TRACE(processor_.logger_) << "Sending response to " << machineId << " (" << remoteIp << "): \"" << std::string(xml_buf, xml_buf_size) << "\"";
+
+ mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+ mg_printf(conn, "Content-Type: application/soap+xml;charset=UTF-8\r\n");
+ mg_printf(conn, "Authorization: %s\r\n", WSMAN_SECURITY_PROFILE_HTTPS_MUTUAL);
+ mg_printf(conn, "Content-Length: %zu\r\n", xml_buf_size);
+ mg_printf(conn, "\r\n");
+ mg_printf(conn, "%.*s", static_cast<int>(xml_buf_size), xml_buf);
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::handleSubscriptionManager(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request) {
+ utils::ScopeGuard request_guard([&]() {
+ ws_xml_destroy_doc(request);
+ });
+
+ auto action = getSoapAction(request);
+ auto machine_id = getMachineId(request);
+ const struct mg_request_info* req_info = mg_get_request_info(conn);
+ std::string remote_ip = req_info->remote_addr;
+ if (action != ENUM_ACTION_ENUMERATE) {
+ processor_.logger_->log_error("%s called by %s (%s) with unknown Action \"%s\"", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str(), action.c_str());
+ return false; // TODO(bakaid): generate fault if possible
+ }
+
+ // Create reponse envelope from request
+ WsXmlDocH response = wsman_create_response_envelope(request, nullptr);
+ utils::ScopeGuard response_guard([&]() {
+ ws_xml_destroy_doc(response);
+ });
+
+ // Header
+ WsXmlNodeH response_header = ws_xml_get_soap_header(response);
+ // Header/MessageID
+ utils::Identifier msg_id = utils::IdGenerator::getIdGenerator()->generate();
+ ws_xml_add_child_format(response_header, XML_NS_ADDRESSING, WSA_MESSAGE_ID, "uuid:%s", msg_id.to_string().c_str());
+
+ // Body
+ WsXmlNodeH response_body = ws_xml_get_soap_body(response);
+ // Body/EnumerationResponse
+ WsXmlNodeH enumeration_response = ws_xml_add_child(response_body, XML_NS_ENUMERATION, WSENUM_ENUMERATE_RESP, nullptr);
+ // Body/EnumerationResponse/EnumerationContext
+ ws_xml_add_child(enumeration_response, XML_NS_ENUMERATION, WSENUM_ENUMERATION_CONTEXT, nullptr);
+ // Body/EnumerationResponse/Items
+ WsXmlNodeH enumeration_items = ws_xml_add_child(enumeration_response, XML_NS_WS_MAN, WSENUM_ITEMS, nullptr);
+ // Body/EnumerationResponse/EndOfSequence
+ ws_xml_add_child(enumeration_response, XML_NS_WS_MAN, WSENUM_END_OF_SEQUENCE, nullptr);
+
+ // Body/EnumerationResponse/Items/Subscription
+ WsXmlNodeH subscription = ws_xml_add_child(enumeration_items, nullptr, "Subscription", nullptr);
+ ws_xml_set_ns(subscription, XML_NS_CUSTOM_SUBSCRIPTION, "m");
+
+ // Body/EnumerationResponse/Items/Subscription/Version
+ std::lock_guard<std::mutex> lock(processor_.mutex_);
+ auto it = processor_.subscribers_.find(machine_id);
+
+ std::string subscription_version;
+ if (it != processor_.subscribers_.end() && it->second.subscription_ != nullptr) {
+ subscription_version = it->second.subscription_version_;
+ } else {
+ utils::Identifier id = utils::IdGenerator::getIdGenerator()->generate();
+ subscription_version = id.to_string();
+ }
+ ws_xml_add_child_format(subscription, XML_NS_CUSTOM_SUBSCRIPTION, "Version", "uuid:%s", subscription_version.c_str());
+
+ // Body/EnumerationResponse/Items/Subscription/Envelope
+ std::string subscription_identifier;
+ std::string subscription_endpoint;
+ if (it != processor_.subscribers_.end() && it->second.subscription_ != nullptr) {
+ WsXmlNodeH subscription_node = ws_xml_get_doc_root(it->second.subscription_);
+ ws_xml_copy_node(subscription_node, subscription);
+ } else {
+ WsXmlDocH subscription_doc = ws_xml_create_envelope();
+
+ // Header
+ WsXmlNodeH header = ws_xml_get_soap_header(subscription_doc);
+ WsXmlNodeH node;
+
+ // Header/Action
+ node = ws_xml_add_child(header, XML_NS_ADDRESSING, WSA_ACTION, EVT_ACTION_SUBSCRIBE);
+ ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+ // Header/MessageID
+ utils::Identifier msg_id = utils::IdGenerator::getIdGenerator()->generate();
+ ws_xml_add_child_format(header, XML_NS_ADDRESSING, WSA_MESSAGE_ID, "uuid:%s", msg_id.to_string().c_str());
+
+ // Header/To
+ node = ws_xml_add_child(header, XML_NS_ADDRESSING, WSA_TO, WSA_TO_ANONYMOUS);
+ ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+ // Header/ResourceURI
+ node = ws_xml_add_child(header, XML_NS_WS_MAN, WSM_RESOURCE_URI, "http://schemas.microsoft.com/wbem/wsman/1/windows/EventLog");
+ ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+ // Header/ReplyTo
+ node = ws_xml_add_child(header, XML_NS_ADDRESSING, WSA_REPLY_TO, nullptr);
+ node = ws_xml_add_child(node, XML_NS_ADDRESSING, WSA_ADDRESS, WSA_TO_ANONYMOUS);
+ ws_xml_add_node_attr(node, XML_NS_SOAP_1_2, SOAP_MUST_UNDERSTAND, "true");
+
+ // Header/OptionSet
+ WsXmlNodeH option_set = ws_xml_add_child(header, XML_NS_WS_MAN, WSM_OPTION_SET, nullptr);
+ ws_xml_ns_add(option_set, XML_NS_SCHEMA_INSTANCE, XML_NS_SCHEMA_INSTANCE_PREFIX);
+
+ // Header/OptionSet/Option (CDATA)
+ node = ws_xml_add_child(option_set, XML_NS_WS_MAN, WSM_OPTION, nullptr);
+ ws_xml_add_node_attr(node, nullptr, WSM_NAME, "CDATA");
+ ws_xml_add_node_attr(node, XML_NS_SCHEMA_INSTANCE, XML_SCHEMA_NIL, "true");
+
+ // Header/OptionSet/Option (IgnoreChannelError)
+ node = ws_xml_add_child(option_set, XML_NS_WS_MAN, WSM_OPTION, nullptr);
+ ws_xml_add_node_attr(node, nullptr, WSM_NAME, "IgnoreChannelError");
+ ws_xml_add_node_attr(node, XML_NS_SCHEMA_INSTANCE, XML_SCHEMA_NIL, "true");
+
+ // Body
+ WsXmlNodeH body = ws_xml_get_soap_body(subscription_doc);
+ WsXmlNodeH subscribe_node = ws_xml_add_child(body, XML_NS_EVENTING, WSEVENT_SUBSCRIBE, nullptr);
+
+ // Body/Delivery
+ {
+ utils::Identifier id = utils::IdGenerator::getIdGenerator()->generate();
+ subscription_identifier = id.to_string();
+ }
+ {
+ utils::Identifier id = utils::IdGenerator::getIdGenerator()->generate();
+ subscription_endpoint = processor_.subscriptions_base_path_ + "/" + id.to_string();
+ }
+
+ WsXmlNodeH delivery_node = ws_xml_add_child(subscribe_node, XML_NS_EVENTING, WSEVENT_DELIVERY, nullptr);
+ ws_xml_add_node_attr(delivery_node, nullptr, WSEVENT_DELIVERY_MODE, WSEVENT_DELIVERY_MODE_EVENTS);
+
+ // Body/Delivery/Heartbeats
+ ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSM_HEARTBEATS, millisecondsToXsdDuration(processor_.heartbeat_interval_).c_str());
+
+ // Body/Delivery/ConnectionRetry
+ auto connection_retry_node = ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSM_CONNECTIONRETRY, millisecondsToXsdDuration(processor_.connection_retry_interval_).c_str());
+ ws_xml_add_node_attr(connection_retry_node, nullptr, "Total", std::to_string(processor_.connection_retry_count_).c_str());
+
+ // Body/Delivery/NotifyTo and Body/EndTo are the same, so we will use this lambda to recreate the same tree
+ auto apply_endpoint_nodes = [&](WsXmlNodeH target_node) {
+ // ${target_node}/NotifyTo/Address
+ ws_xml_add_child_format(target_node, XML_NS_ADDRESSING, WSA_ADDRESS, "https://%s:%hu%s",
+ processor_.listen_hostname_.c_str(),
+ processor_.listen_port_,
+ subscription_endpoint.c_str());
+ // ${target_node}/ReferenceProperties
+ node = ws_xml_add_child(target_node, XML_NS_ADDRESSING, WSA_REFERENCE_PROPERTIES, nullptr);
+ // ${target_node}/ReferenceProperties/Identifier
+ ws_xml_add_child_format(node, XML_NS_EVENTING, WSEVENT_IDENTIFIER, "%s", subscription_identifier.c_str());
+ // ${target_node}/Policy
+ WsXmlNodeH policy = ws_xml_add_child(target_node, nullptr, "Policy", nullptr);
+ ws_xml_set_ns(policy, XML_NS_CUSTOM_POLICY, "c");
+ ws_xml_ns_add(policy, XML_NS_CUSTOM_AUTHENTICATION, "auth");
+ // ${target_node}/Policy/ExactlyOne
+ WsXmlNodeH exactly_one = ws_xml_add_child(policy, XML_NS_CUSTOM_POLICY, "ExactlyOne", nullptr);
+ // ${target_node}/Policy/ExactlyOne/All
+ WsXmlNodeH all = ws_xml_add_child(exactly_one, XML_NS_CUSTOM_POLICY, "All", nullptr);
+ // ${target_node}/Policy/ExactlyOne/All/Authentication
+ WsXmlNodeH authentication = ws_xml_add_child(all, XML_NS_CUSTOM_AUTHENTICATION, "Authentication", nullptr);
+ ws_xml_add_node_attr(authentication, nullptr, "Profile", WSMAN_SECURITY_PROFILE_HTTPS_MUTUAL);
+ // ${target_node}/Policy/ExactlyOne/All/Authentication/ClientCertificate
+ WsXmlNodeH client_certificate = ws_xml_add_child(authentication, XML_NS_CUSTOM_AUTHENTICATION, "ClientCertificate", nullptr);
+ // ${target_node}/Policy/ExactlyOne/All/Authentication/ClientCertificate/Thumbprint
+ WsXmlNodeH thumbprint = ws_xml_add_child_format(client_certificate, XML_NS_CUSTOM_AUTHENTICATION, "Thumbprint", "%s", processor_.ssl_ca_cert_thumbprint_.c_str());
+ ws_xml_add_node_attr(thumbprint, nullptr, "Role", "issuer");
+ };
+
+ // Body/Delivery/NotifyTo
+ WsXmlNodeH notifyto_node = ws_xml_add_child(delivery_node, XML_NS_EVENTING, WSEVENT_NOTIFY_TO, nullptr);
+ apply_endpoint_nodes(notifyto_node);
+
+ // Body/EndTo
+ WsXmlNodeH endto_node = ws_xml_add_child(subscribe_node, XML_NS_EVENTING, WSEVENT_ENDTO, nullptr);
+ apply_endpoint_nodes(endto_node);
+
+ // Body/MaxElements
+ ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSM_MAX_ELEMENTS, std::to_string(processor_.max_elements_).c_str());
+ // Body/MaxTime
+ ws_xml_add_child(delivery_node, XML_NS_WS_MAN, WSENUM_MAX_TIME, millisecondsToXsdDuration(processor_.max_latency_).c_str());
+
+ // Body/Expires
+ ws_xml_add_child(subscribe_node, XML_NS_EVENTING, WSEVENT_EXPIRES, millisecondsToXsdDuration(processor_.subscription_expiration_interval_).c_str());
+
+ // Body/Filter
+ WsXmlNodeH filter_node = ws_xml_add_child(subscribe_node, XML_NS_WS_MAN, WSM_FILTER, processor_.xpath_xml_query_.c_str());
+ // ws_xml_add_node_attr(filter_node, nullptr, "Dialect", "http://schemas.microsoft.com/win/2004/08/events/eventquery");
+
+ // Body/Bookmark
+ if (it != processor_.subscribers_.end() && it->second.bookmark_ != nullptr) {
+ WsXmlNodeH bookmark_node = ws_xml_get_doc_root(it->second.bookmark_);
+ ws_xml_copy_node(bookmark_node, subscribe_node);
+ } else if (processor_.initial_existing_events_strategy_ == INITIAL_EXISTING_EVENTS_STRATEGY_ALL) {
+ ws_xml_add_child(subscribe_node, XML_NS_WS_MAN, WSM_BOOKMARK, "http://schemas.dmtf.org/wbem/wsman/1/wsman/bookmark/earliest");
+ }
+
+ // Body/SendBookmarks
+ ws_xml_add_child(subscribe_node, XML_NS_WS_MAN, WSM_SENDBOOKMARKS, nullptr);
+
+ // Copy the whole Subscription
+ WsXmlNodeH subscription_node = ws_xml_get_doc_root(subscription_doc);
+ ws_xml_copy_node(subscription_node, subscription);
+
+ // Save subscription
+ if (it == processor_.subscribers_.end()) {
+ it = processor_.subscribers_.emplace(machine_id, SubscriberData()).first;
+ }
+ it->second.setSubscription(subscription_version, subscription_doc, subscription_endpoint, subscription_identifier);
+ }
+
+ // Send response
+ char* xml_buf = nullptr;
+ int xml_buf_size = 0;
+ ws_xml_dump_memory_enc(response, &xml_buf, &xml_buf_size, "UTF-8");
+
+ sendResponse(conn, machine_id, req_info->remote_addr, xml_buf, xml_buf_size);
+
+ ws_xml_free_memory(xml_buf);
+
+ return true;
+}
+
+SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char* text)
+ : text_(text) {
+}
+
+int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ return stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_));
+}
+
+int SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNodeH node, void* data) {
+ if (data == nullptr) {
+ return 1;
+ }
+
+ std::shared_ptr<core::ProcessSession> session;
+ std::shared_ptr<logging::Logger> logger;
+ std::string machine_id;
+ std::string remote_ip;
+ std::tie(session, logger, machine_id, remote_ip) = *static_cast<std::tuple<std::shared_ptr<core::ProcessSession>, std::shared_ptr<logging::Logger>, std::string, std::string>*>(data);
+
+ char* text = ws_xml_get_node_text(node);
+ if (text == nullptr) {
+ logger->log_error("Failed to get text for node");
+ return 1;
+ }
+
+ try {
+ logger->log_trace("Found Event");
+ auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+ if (flow_file == nullptr) {
+ logger->log_error("Failed to create FlowFile");
+ return 1;
+ }
+
+ WriteCallback callback(text);
+ session->write(flow_file, &callback);
+
+ session->putAttribute(flow_file, FlowAttributeKey(MIME_TYPE), "application/xml");
+ flow_file->addAttribute(ATTRIBUTE_WEF_REMOTE_MACHINEID, machine_id);
+ flow_file->addAttribute(ATTRIBUTE_WEF_REMOTE_IP, remote_ip);
+
+ session->transfer(flow_file, SourceInitiatedSubscriptionListener::Success);
+ session->commit();
+ } catch (const std::exception& e) {
+ logger->log_error("Caught exception while processing Events: %s", e.what());
+ return 1;
+ } catch (...) {
+ logger->log_error("Caught exception while processing Events");
+ return 1;
+ }
+
+ return 0;
+}
+
+bool SourceInitiatedSubscriptionListener::Handler::handleSubscriptions(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request) {
+ utils::ScopeGuard guard([&]() {
+ ws_xml_destroy_doc(request);
+ });
+ auto action = getSoapAction(request);
+ auto machine_id = getMachineId(request);
+ const struct mg_request_info* req_info = mg_get_request_info(conn);
+ std::string remote_ip = req_info->remote_addr;
+ if (action == EVT_ACTION_SUBEND) {
+ std::lock_guard<std::mutex> lock(processor_.mutex_);
+ auto it = processor_.subscribers_.find(machine_id);
+ if (it != processor_.subscribers_.end()) {
+ processor_.subscribers_.erase(it);
+ }
+ // TODO(bakaid): make sure whether we really have to clean the bookmark as well (based on the fault)
+ } else if (action == WSMAN_CUSTOM_ACTION_HEARTBEAT) {
+ processor_.logger_->log_debug("Received Heartbeat on %s from %s (%s)", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+ } else if (action == WSMAN_CUSTOM_ACTION_EVENTS) {
+ processor_.logger_->log_debug("Received Events on %s from %s (%s)", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+ // Body
+ WsXmlNodeH body = ws_xml_get_soap_body(request);
+ if (body == nullptr) {
+ processor_.logger_->log_error("Received malformed Events request on %s from %s (%s), SOAP Body missing", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+ return false;
+ }
+ // Body/Events
+ WsXmlNodeH events_node = ws_xml_get_child(body, 0 /*index*/, XML_NS_WS_MAN, WSM_EVENTS);
+ if (events_node == nullptr) {
+ processor_.logger_->log_error("Received malformed Events request on %s from %s (%s), Events missing", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+ return false;
+ }
+ const struct mg_request_info* req_info = mg_get_request_info(conn);
+ // Enumare Body/Events/Event nodes
+ auto session = processor_.session_factory_->createSession();
+ std::tuple<std::shared_ptr<core::ProcessSession>, std::shared_ptr<logging::Logger>, std::string, std::string> callback_args =
+ std::forward_as_tuple(session, processor_.logger_, machine_id, remote_ip);
+ int ret = ws_xml_enum_children(events_node, &SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback, &callback_args, 0 /*bRecursive*/);
+ if (ret != 0) {
+ processor_.logger_->log_error("Failed to parse events on %s from %s (%s), rolling back session", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str());
+ session->rollback();
+ }
+ // Header
+ WsXmlNodeH header = ws_xml_get_soap_header(request);
+ // Header/Bookmark
+ WsXmlNodeH bookmark_node = ws_xml_get_child(header, 0 /*index*/, XML_NS_WS_MAN, WSM_BOOKMARK);
+ if (ret == 0 && bookmark_node != nullptr) {
+ WsXmlDocH bookmark_doc = ws_xml_create_doc(XML_NS_WS_MAN, WSM_BOOKMARK);
+ WsXmlNodeH temp = ws_xml_get_doc_root(bookmark_doc);
+ ws_xml_duplicate_children(temp, bookmark_node);
+
+ std::lock_guard<std::mutex> lock(processor_.mutex_);
+ auto it = processor_.subscribers_.find(machine_id);
+ if (it != processor_.subscribers_.end()) {
+ it = processor_.subscribers_.emplace(machine_id, SubscriberData()).first;
+ }
+ it->second.setBookmark(bookmark_doc);
+ // Bookmark changed, invalidate stored subscription
+ it->second.clearSubscription();
+
+ // Persist state
+ processor_.persistState();
+
+ char* xml_buf = nullptr;
+ int xml_buf_size = 0;
+ ws_xml_dump_memory_enc(bookmark_doc, &xml_buf, &xml_buf_size, "UTF-8");
+ processor_.logger_->log_debug("Saved new bookmark for %s: \"%.*s\"", machine_id.c_str(), xml_buf_size, xml_buf);
+ ws_xml_free_memory(xml_buf);
+ }
+ } else {
+ processor_.logger_->log_error("%s called by %s (%s) with unknown Action \"%s\"", endpoint.c_str(), machine_id.c_str(), remote_ip.c_str(), action.c_str());
+ return false; // TODO(bakaid): generate fault if possible
+ }
+
+ if (isAckRequested(request)) {
+ // Assemble ACK
+ WsXmlDocH ack = wsman_create_response_envelope(request, WSMAN_CUSTOM_ACTION_ACK);
+ // Header
+ WsXmlNodeH ack_header = ws_xml_get_soap_header(ack);
+
+ // Header/MessageID
+ utils::Identifier msg_id = utils::IdGenerator::getIdGenerator()->generate();
+ ws_xml_add_child_format(ack_header, XML_NS_ADDRESSING, WSA_MESSAGE_ID, "uuid:%s", msg_id.to_string().c_str());
+
+ // Send ACK
+ char* xml_buf = nullptr;
+ int xml_buf_size = 0;
+ ws_xml_dump_memory_enc(ack, &xml_buf, &xml_buf_size, "UTF-8");
+
+ sendResponse(conn, machine_id, remote_ip, xml_buf, xml_buf_size);
+
+ ws_xml_free_memory(xml_buf);
+ ws_xml_destroy_doc(ack);
+ }
+
+ return true;
+}
+
+void SourceInitiatedSubscriptionListener::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ logger_->log_trace("SourceInitiatedSubscriptionListener onTrigger called");
+}
+
+void SourceInitiatedSubscriptionListener::initialize() {
+ logger_->log_trace("Initializing SourceInitiatedSubscriptionListener");
+
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(ListenHostname);
+ properties.insert(ListenPort);
+ properties.insert(SubscriptionManagerPath);
+ properties.insert(SubscriptionsBasePath);
+ properties.insert(SSLCertificate);
+ properties.insert(SSLCertificateAuthority);
+ properties.insert(SSLVerifyPeer);
+ properties.insert(XPathXmlQuery);
+ properties.insert(InitialExistingEventsStrategy);
+ properties.insert(SubscriptionExpirationInterval);
+ properties.insert(HeartbeatInterval);
+ properties.insert(MaxElements);
+ properties.insert(MaxLatency);
+ properties.insert(ConnectionRetryInterval);
+ properties.insert(ConnectionRetryCount);
+ setSupportedProperties(properties);
+
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+void SourceInitiatedSubscriptionListener::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ std::string ssl_certificate_file;
+ std::string ssl_ca_file;
+ bool verify_peer = true;
+
+ state_manager_ = context->getStateManager();
+ if (state_manager_ == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+
+ std::string value;
+ context->getProperty(ListenHostname.getName(), listen_hostname_);
+ if (!context->getProperty(ListenPort.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Listen Port attribute is missing or invalid");
+ } else {
+ core::Property::StringToInt(value, listen_port_);
+ }
+ context->getProperty(SubscriptionManagerPath.getName(), subscription_manager_path_);
+ context->getProperty(SubscriptionsBasePath.getName(), subscriptions_base_path_);
+ if (!context->getProperty(SSLCertificate.getName(), ssl_certificate_file)) {
+ throw Exception(PROCESSOR_EXCEPTION,"SSL Certificate attribute is missing");
+ }
+ if (!context->getProperty(SSLCertificateAuthority.getName(), ssl_ca_file)) {
+ throw Exception(PROCESSOR_EXCEPTION,"SSL Certificate Authority attribute is missing");
+ }
+ if (!context->getProperty(SSLVerifyPeer.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"SSL Verify Peer attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, verify_peer);
+ }
+ context->getProperty(XPathXmlQuery.getName(), xpath_xml_query_);
+ if (!context->getProperty(InitialExistingEventsStrategy.getName(), initial_existing_events_strategy_)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Initial Existing Events Strategy attribute is missing or invalid");
+ }
+ if (!context->getProperty(SubscriptionExpirationInterval.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Subscription Expiration Interval attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, subscription_expiration_interval_, unit) ||
+ !core::Property::ConvertTimeUnitToMS(subscription_expiration_interval_, unit, subscription_expiration_interval_)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Subscription Expiration Interval attribute is invalid");
+ }
+ }
+ if (!context->getProperty(HeartbeatInterval.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Heartbeat Interval attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, heartbeat_interval_, unit) || !core::Property::ConvertTimeUnitToMS(heartbeat_interval_, unit, heartbeat_interval_)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Heartbeat Interval attribute is invalid");
+ }
+ }
+ if (!context->getProperty(MaxElements.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Max Elements attribute is missing or invalid");
+ } else if (!core::Property::StringToInt(value, max_elements_)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Max Elements attribute is invalid");
+ }
+ if (!context->getProperty(MaxLatency.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Max Latency attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, max_latency_, unit) || !core::Property::ConvertTimeUnitToMS(max_latency_, unit, max_latency_)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Max Latency attribute is invalid");
+ }
+ }
+ if (!context->getProperty(ConnectionRetryInterval.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Interval attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, connection_retry_interval_, unit) || !core::Property::ConvertTimeUnitToMS(connection_retry_interval_, unit, connection_retry_interval_)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Interval attribute is invalid");
+ }
+ }
+ if (!context->getProperty(ConnectionRetryCount.getName(), value)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Count attribute is missing or invalid");
+ } else if (!core::Property::StringToInt(value, connection_retry_count_)) {
+ throw Exception(PROCESSOR_EXCEPTION,"Connection Retry Count attribute is invalid");
+ }
+
+ FILE* fp = fopen(ssl_ca_file.c_str(), "rb");
+ if (fp == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION,"Failed to open file specified by SSL Certificate Authority attribute");
+ }
+ X509* ca = nullptr;
+ PEM_read_X509(fp, &ca, nullptr, nullptr);
+ fclose(fp);
+ if (ca == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION,"Failed to parse file specified by SSL Certificate Authority attribute");
+ }
+ std::array<uint8_t, 20U> hash_buf;
+ int ret = X509_digest(ca, EVP_sha1(), hash_buf.data(), nullptr);
+ X509_free(ca);
+ if (ret != 1) {
+ throw Exception(PROCESSOR_EXCEPTION,"Failed to get fingerprint for CA specified by SSL Certificate Authority attribute");
+ }
+ ssl_ca_cert_thumbprint_ = utils::StringUtils::to_hex(hash_buf.data(), hash_buf.size(), true /*uppercase*/);
+ logger_->log_debug("%s SHA-1 thumbprint is %s", ssl_ca_file.c_str(), ssl_ca_cert_thumbprint_.c_str());
+
+ session_factory_ = sessionFactory;
+
+ // Load state
+ loadState();
+
+ // Start server
+ std::vector<std::string> options;
+ options.emplace_back("enable_keep_alive");
+ options.emplace_back("yes");
+ options.emplace_back("keep_alive_timeout_ms");
+ options.emplace_back("15000");
+ options.emplace_back("num_threads");
+ options.emplace_back("1");
+ options.emplace_back("listening_ports");
+ options.emplace_back(std::to_string(listen_port_) + "s");
+ options.emplace_back("ssl_certificate");
+ options.emplace_back(ssl_certificate_file);
+ options.emplace_back("ssl_ca_file");
+ options.emplace_back(ssl_ca_file);
+ options.emplace_back("ssl_verify_peer");
+ options.emplace_back(verify_peer ? "yes" : "no");
+
+ try {
+ server_ = std::unique_ptr<CivetServer>(new CivetServer(options));
+ } catch (const std::exception& e) {
+ throw Exception(PROCESSOR_EXCEPTION, std::string("Failed to initialize server, error: ") + e.what());
+ } catch (...) {
+ throw Exception(PROCESSOR_EXCEPTION,"Failed to initialize server");
+ }
+ handler_ = std::unique_ptr<Handler>(new Handler(*this));
+ server_->addHandler("**", *handler_);
+}
+
+void SourceInitiatedSubscriptionListener::notifyStop() {
+ server_.release();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
new file mode 100644
index 0000000..70ef004
--- /dev/null
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -0,0 +1,173 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
+#define __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include <CivetServer.h>
+extern "C" {
+#include "wsman-xml.h"
+}
+
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class SourceInitiatedSubscriptionListener : public core::Processor {
+ public:
+ static constexpr char const *INITIAL_EXISTING_EVENTS_STRATEGY_NONE = "None";
+ static constexpr char const *INITIAL_EXISTING_EVENTS_STRATEGY_ALL = "All";
+
+ static constexpr char const* ProcessorName = "SourceInitiatedSubscriptionListener";
+
+ explicit SourceInitiatedSubscriptionListener(std::string name, utils::Identifier uuid = utils::Identifier());
+
+ // Supported Properties
+ static core::Property ListenHostname;
+ static core::Property ListenPort;
+ static core::Property SubscriptionManagerPath;
+ static core::Property SubscriptionsBasePath;
+ static core::Property SSLCertificate;
+ static core::Property SSLCertificateAuthority;
+ static core::Property SSLVerifyPeer;
+ static core::Property XPathXmlQuery;
+ static core::Property InitialExistingEventsStrategy;
+ static core::Property SubscriptionExpirationInterval;
+ static core::Property HeartbeatInterval;
+ static core::Property MaxElements;
+ static core::Property MaxLatency;
+ static core::Property ConnectionRetryInterval;
+ static core::Property ConnectionRetryCount;
+
+ // Supported Relationships
+ static core::Relationship Success;
+
+ // Writes Attributes
+ static constexpr char const* ATTRIBUTE_WEF_REMOTE_MACHINEID = "wef.remote.machineid";
+ static constexpr char const* ATTRIBUTE_WEF_REMOTE_IP = "wef.remote.ip";
+
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ void notifyStop() override;
+
+ class Handler: public CivetHandler {
+ public:
+ explicit Handler(SourceInitiatedSubscriptionListener& processor);
+ bool handlePost(CivetServer* server, struct mg_connection* conn);
+
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ explicit WriteCallback(char* text);
+ int64_t process(std::shared_ptr<io::BaseStream> stream);
+
+ private:
+ char* text_;
+ };
+
+ private:
+ SourceInitiatedSubscriptionListener& processor_;
+
+ bool handleSubscriptionManager(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request);
+ bool handleSubscriptions(struct mg_connection* conn, const std::string& endpoint, WsXmlDocH request);
+
+ static int enumerateEventCallback(WsXmlNodeH node, void* data);
+ std::string getSoapAction(WsXmlDocH doc);
+ std::string getMachineId(WsXmlDocH doc);
+ bool isAckRequested(WsXmlDocH doc);
+ void sendResponse(struct mg_connection* conn, const std::string& machineId, const std::string& remoteIp, char* xml_buf, size_t xml_buf_size);
+
+ static std::string millisecondsToXsdDuration(int64_t milliseconds);
+ };
+
+ protected:
+ std::shared_ptr<logging::Logger> logger_;
+
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
+
+ std::shared_ptr<core::ProcessSessionFactory> session_factory_;
+
+ std::string listen_hostname_;
+ uint16_t listen_port_;
+ std::string subscription_manager_path_;
+ std::string subscriptions_base_path_;
+ std::string ssl_ca_cert_thumbprint_;
+ std::string xpath_xml_query_;
+ std::string initial_existing_events_strategy_;
+ int64_t subscription_expiration_interval_;
+ int64_t heartbeat_interval_;
+ uint32_t max_elements_;
+ int64_t max_latency_;
+ int64_t connection_retry_interval_;
+ uint32_t connection_retry_count_;
+
+ std::unique_ptr<CivetServer> server_;
+ std::unique_ptr<Handler> handler_;
+
+ struct SubscriberData {
+ WsXmlDocH bookmark_;
+ std::string subscription_version_;
+ WsXmlDocH subscription_;
+ std::string subscription_endpoint_;
+ std::string subscription_identifier_;
+
+ SubscriberData();
+ ~SubscriberData();
+
+ void setSubscription(const std::string& subscription_version, WsXmlDocH subscription, const std::string& subscription_endpoint, const std::string& subscription_identifier);
+ void clearSubscription();
+ void setBookmark(WsXmlDocH bookmark);
+ void clearBookmark();
+ };
+
+ std::mutex mutex_;
+ std::map<std::string /*machineId*/, SubscriberData> subscribers_;
+
+ bool persistState() const;
+ bool loadState();
+};
+
+REGISTER_RESOURCE(SourceInitiatedSubscriptionListener, "This processor implements a Windows Event Forwarding Source Initiated Subscription server with the help of OpenWSMAN. "
+ "Windows hosts can be set up to connect and forward Event Logs to this processor.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif // __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
index 6ac89f2..dd49beb 100644
--- a/extensions/sftp/processors/ListSFTP.cpp
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -794,8 +794,7 @@
return false;
}
- size_t i = 0;
- while (true) {
+ for (size_t i = 0U;; i++) {
std::string name;
try {
name = state_map.at("entity." + std::to_string(i) + ".name");
@@ -812,7 +811,6 @@
logger_->log_error("State for entity \"%s\" is missing or invalid, skipping", name);
continue;
}
- ++i;
}
if (state_listing_strategy != listing_strategy_ ||
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 0a52a93..84fc92a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -246,8 +246,7 @@
std::unordered_map<std::string, std::string> state_map;
if (state_manager_->get(state_map)) {
std::map<std::string, TailState> new_tail_states;
- size_t i = 0;
- while (true) {
+ for (size_t i = 0U;; i++) {
std::string name;
try {
name = state_map.at("file." + std::to_string(i) + ".name");
@@ -268,7 +267,6 @@
} catch (...) {
continue;
}
- ++i;
}
state_load_success = true;
tail_states_ = std::move(new_tail_states);
diff --git a/thirdparty/libxml2/libxml2-win.patch b/thirdparty/libxml2/libxml2-win.patch
new file mode 100644
index 0000000..67162bc
--- /dev/null
+++ b/thirdparty/libxml2/libxml2-win.patch
@@ -0,0 +1,126 @@
+diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
+--- orig/CMakeLists.txt 1970-01-01 01:00:00.000000000 +0100
++++ patched/CMakeLists.txt 2020-04-17 14:16:13.000000000 +0200
+@@ -0,0 +1,65 @@
++cmake_minimum_required(VERSION 3.7)
++
++project(libxml2)
++
++set(SOURCES buf.c
++ c14n.c
++ catalog.c
++ chvalid.c
++ debugXML.c
++ dict.c
++ DOCBparser.c
++ encoding.c
++ entities.c
++ error.c
++ globals.c
++ hash.c
++ HTMLparser.c
++ HTMLtree.c
++ legacy.c
++ list.c
++ nanoftp.c
++ nanohttp.c
++ parser.c
++ parserInternals.c
++ pattern.c
++ relaxng.c
++ SAX.c
++ SAX2.c
++ schematron.c
++ threads.c
++ tree.c
++ uri.c
++ valid.c
++ xinclude.c
++ xlink.c
++ xmlcatalog.c
++ xmlIO.c
++ xmlmemory.c
++ xmlmodule.c
++ xmlreader.c
++ xmlregexp.c
++ xmlsave.c
++ xmlschemas.c
++ xmlschemastypes.c
++ xmlstring.c
++ xmlunicode.c
++ xmlwriter.c
++ xpath.c
++ xpointer.c)
++
++add_library(xml2 STATIC ${SOURCES})
++
++set_property(TARGET xml2 PROPERTY POSITION_INDEPENDENT_CODE ON)
++
++target_include_directories(xml2
++ PRIVATE
++ include
++ win32/VC10)
++
++install(TARGETS xml2
++ ARCHIVE DESTINATION lib
++)
++
++install(DIRECTORY include/libxml DESTINATION include/libxml2
++ FILES_MATCHING PATTERN "*.h")
+diff -rupN orig/include/libxml/xmlversion.h patched/include/libxml/xmlversion.h
+--- orig/include/libxml/xmlversion.h 2019-10-30 20:14:29.000000000 +0100
++++ patched/include/libxml/xmlversion.h 2020-04-17 14:52:58.000000000 +0200
+@@ -50,7 +50,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+ *
+ * extra version information, used to show a CVS compilation
+ */
+-#define LIBXML_VERSION_EXTRA "-GITv2.9.10-rc1-2-ga5bb6aaa2"
++#define LIBXML_VERSION_EXTRA ""
+
+ /**
+ * LIBXML_TEST_VERSION:
+@@ -171,7 +171,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+ *
+ * Whether the FTP support is configured in
+ */
+-#if 1
++#if 0
+ #define LIBXML_FTP_ENABLED
+ #endif
+
+@@ -180,7 +180,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+ *
+ * Whether the HTTP support is configured in
+ */
+-#if 1
++#if 0
+ #define LIBXML_HTTP_ENABLED
+ #endif
+
+@@ -270,7 +270,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+ *
+ * Whether iconv support is available
+ */
+-#if 1
++#if 0
+ #define LIBXML_ICONV_ENABLED
+ #endif
+
+@@ -395,7 +395,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+ *
+ * Whether the Zlib support is compiled in
+ */
+-#if 1
++#if 0
+ #define LIBXML_ZLIB_ENABLED
+ #endif
+
+@@ -404,7 +404,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(i
+ *
+ * Whether the Lzma support is compiled in
+ */
+-#if 1
++#if 0
+ #define LIBXML_LZMA_ENABLED
+ #endif
+
diff --git a/thirdparty/openwsman/openwsman.patch b/thirdparty/openwsman/openwsman.patch
new file mode 100644
index 0000000..5fff4b7
--- /dev/null
+++ b/thirdparty/openwsman/openwsman.patch
@@ -0,0 +1,81 @@
+diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
+--- orig/CMakeLists.txt 2019-09-17 11:38:38.000000000 +0200
++++ patched/CMakeLists.txt 2020-04-16 23:43:22.000000000 +0200
+@@ -24,7 +24,7 @@ if(COMMAND cmake_policy)
+ endif(COMMAND cmake_policy)
+
+ # where to look first for cmake modules, before ${CMAKE_ROOT}/Modules/ is checked
+-SET(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/modules ${CMAKE_MODULE_PATH})
++LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/modules)
+
+ INCLUDE( ${CMAKE_SOURCE_DIR}/VERSION.cmake )
+ SET(VERSION "${OPENWSMAN_MAJOR}.${OPENWSMAN_MINOR}.${OPENWSMAN_PATCH}")
+@@ -168,8 +168,14 @@ ENDIF( USE_PAM )
+
+ INCLUDE(FindOpenSSL)
+ IF(OPENSSL_FOUND)
++ MESSAGE("OpenSSL found")
++ MESSAGE("OPENSSL_INCLUDE_DIR: ${OPENSSL_INCLUDE_DIR}")
++ MESSAGE("OPENSSL_LIBRARIES: ${OPENSSL_LIBRARIES}")
+ SET(HAVE_SSL 1)
+ SET(USE_OPENSSL 1)
++ INCLUDE_DIRECTORIES(${OPENSSL_INCLUDE_DIR})
++ELSE(OPENSSL_FOUND)
++ SET(HAVE_SSL 0)
+ ENDIF(OPENSSL_FOUND)
+
+ IF( BUILD_RUBY )
+@@ -256,6 +262,9 @@ IF(UNIX)
+ IF ( NOT CURL_FOUND)
+ MESSAGE( FATAL_ERROR " curl not found" )
+ ELSE ( NOT CURL_FOUND)
++ MESSAGE("cURL found")
++ MESSAGE("CURL_INCLUDE_DIR: ${CURL_INCLUDE_DIR}")
++ MESSAGE("CURL_LIBRARIES: ${CURL_LIBRARIES}")
+ INCLUDE_DIRECTORIES(${CURL_INCLUDE_DIR})
+ IF(CURL_VERSION_STRING)
+ STRING(COMPARE LESS ${CURL_VERSION_STRING} "7.12.0" result)
+@@ -272,6 +281,9 @@ INCLUDE(FindLibXml2)
+ IF ( NOT LIBXML2_FOUND)
+ MESSAGE( FATAL_ERROR " libxml2 not found" )
+ ELSE ( NOT LIBXML2_FOUND)
++ MESSAGE("Libxml2 found")
++ MESSAGE("LIBXML2_INCLUDE_DIR: ${LIBXML2_INCLUDE_DIR}")
++ MESSAGE("LIBXML2_LIBRARIES: ${LIBXML2_LIBRARIES}")
+ INCLUDE_DIRECTORIES(${LIBXML2_INCLUDE_DIR})
+ ENDIF( NOT LIBXML2_FOUND)
+
+diff -rupN orig/src/lib/wsman-soap.c patched/src/lib/wsman-soap.c
+--- orig/src/lib/wsman-soap.c 2019-09-17 11:38:38.000000000 +0200
++++ patched/src/lib/wsman-soap.c 2020-04-16 23:21:49.000000000 +0200
+@@ -991,10 +991,12 @@ unsigned long get_total_enum_context(WsC
+ * preset, hence marking them as weak symbols and testing to see
+ * if they are resolved before using them.
+ */
++#if 0
+ #pragma weak wsmand_options_get_max_threads
+ extern int wsmand_options_get_max_threads(void);
+ #pragma weak wsmand_options_get_max_connections_per_thread
+ extern int wsmand_options_get_max_connections_per_thread(void);
++#endif
+
+ /**
+ * Enumeration Stub for processing enumeration requests
+@@ -1030,9 +1032,17 @@ wsenum_enumerate_stub(SoapOpH op,
+ int max_threads = 0;
+ int max_connections_per_thread = 0;
+ int(* fptr)(void);
++#if 0
+ if((fptr = wsmand_options_get_max_threads) != 0){
++#else
++ if(0){
++#endif
+ max_threads = (* fptr)();
++#if 0
+ if((fptr = wsmand_options_get_max_connections_per_thread) != 0){
++#else
++ if(0){
++#endif
+ max_connections_per_thread = (* fptr)();
+ }
+ else{