Merge branch 'minificpp-1013-rebase'
Signed-off-by: Daniel Bakai <bakaid@apache.org>
This closes #732
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 49703b0..c3850f8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -328,6 +328,11 @@
endif()
endif(WIN32)
+option(ENABLE_SQL "Enables the SQL Suite of Tools." OFF)
+if (ENABLE_ALL OR ENABLE_SQL)
+ createExtension(SQL-EXTENSIONS "SQL EXTENSIONS" "Enables the SQL Suite of Tools" "extensions/sql")
+endif()
+
## Create MQTT Extension
option(ENABLE_MQTT "Enables the mqtt extension." OFF)
if(ENABLE_ALL OR ENABLE_MQTT)
@@ -374,12 +379,16 @@
## SQLite extensions
option(ENABLE_SQLITE "Disables the scripting extensions." OFF)
-if (ENABLE_ALL OR ENABLE_SQLITE)
+if (ENABLE_SQLITE)
include(BundledSQLite)
use_bundled_sqlite(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
createExtension(SQLITE-EXTENSIONS "SQLITE EXTENSIONS" "This enables sqlite" "extensions/sqlite" "${TEST_DIR}/sqlite-tests")
endif()
+if (ENABLE_SQL AND ENABLE_SQLITE)
+ message(FATAL_ERROR "ENABLE_SQL and ENABLE_SQLITE are incompatible. Set only one at a time.")
+endif()
+
## USB camera extensions
option(ENABLE_USB_CAMERA "Enables USB camera support." OFF)
if (ENABLE_ALL OR ENABLE_USB_CAMERA)
diff --git a/CMakeSettings.json b/CMakeSettings.json
index 89ce4d7..96f2694 100644
--- a/CMakeSettings.json
+++ b/CMakeSettings.json
@@ -60,6 +60,10 @@
"value": "TRUE"
},
{
+ "name": "ENABLE_SQL",
+ "value": "TRUE"
+ },
+ {
"name": "ENABLE_LIBRDKAFKA",
"value": "OFF"
},
diff --git a/aptitude.sh b/aptitude.sh
index 05913d5..289dbfe 100644
--- a/aptitude.sh
+++ b/aptitude.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-verify_enable(){
+verify_enable_platform(){
feature="$1"
feature_status=${!1}
verify_gcc_enable $feature
diff --git a/bootstrap.sh b/bootstrap.sh
index ac60b9b..b7cd7c4 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -42,6 +42,7 @@
CMAKE_OPTIONS_ENABLED=()
CMAKE_OPTIONS_DISABLED=()
CMAKE_MIN_VERSION=()
+INCOMPATIBLE_WITH=()
DEPLOY_LIMITS=()
USER_DISABLE_TESTS="${FALSE}"
USE_NINJA="false"
@@ -301,6 +302,9 @@
add_disabled_option SQLITE_ENABLED ${FALSE} "ENABLE_SQLITE"
+add_disabled_option SQL_ENABLED ${FALSE} "ENABLE_SQL"
+set_incompatible_with SQL_ENABLED SQLITE_ENABLED
+
# Since the following extensions have limitations on
add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE}
add_dependency BUSTACHE_ENABLED "boost"
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 93b85f5..d292d85 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -69,6 +69,11 @@
done
}
+set_incompatible_with(){
+ INCOMPATIBLE_WITH+=("$1:$2")
+ INCOMPATIBLE_WITH+=("$2:$1")
+}
+
print_multi_option_status(){
feature="$1"
feature_status=${!1}
@@ -169,6 +174,30 @@
done
}
+check_compatibility(){
+ for option in "${INCOMPATIBLE_WITH[@]}" ; do
+ OPT=${option%%:*}
+ if [ "$OPT" = "$1" ]; then
+ OTHER_FEATURE=${option#*:}
+ OTHER_FEATURE_VALUE=${!OTHER_FEATURE}
+ if [ $OTHER_FEATURE_VALUE = "Enabled" ]; then
+ echo "false"
+ return
+ fi
+ fi
+ done
+ echo "true"
+}
+
+verify_enable(){
+ COMPATIBLE=$(check_compatibility $1)
+ if [ "$COMPATIBLE" = "true" ]; then
+ verify_enable_platform $1
+ else
+ echo "false"
+ fi
+}
+
can_deploy(){
for option in "${DEPLOY_LIMITS[@]}" ; do
OPT=${option%%:*}
@@ -334,6 +363,7 @@
echo "V. AWS Support .................$(print_feature_status AWS_ENABLED)"
echo "T. OpenCV Support ..............$(print_feature_status OPENCV_ENABLED)"
echo "U. OPC-UA Support...............$(print_feature_status OPC_ENABLED)"
+ echo "W. SQL Support..................$(print_feature_status SQL_ENABLED)"
echo "****************************************"
echo " Build Options."
echo "****************************************"
@@ -348,12 +378,13 @@
fi
echo "Q. Quit"
echo "* Extension cannot be installed due to"
- echo -e " version of cmake or other software\r\n"
+ echo " version of cmake or other software, or"
+ echo -e " incompatibility with other extensions\r\n"
}
read_feature_options(){
local choice
- read -p "Enter choice [ A - V or 1-4 ] " choice
+ read -p "Enter choice [ A - W or 1-4 ] " choice
choice=$(echo ${choice} | tr '[:upper:]' '[:lower:]')
case $choice in
a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -380,6 +411,7 @@
s) ToggleFeature SFTP_ENABLED ;;
t) ToggleFeature OPENCV_ENABLED ;;
u) ToggleFeature OPC_ENABLED ;;
+ w) ToggleFeature SQL_ENABLED ;;
1) ToggleFeature TESTS_DISABLED ;;
2) EnableAllFeatures ;;
3) ToggleFeature JNI_ENABLED;;
diff --git a/centos.sh b/centos.sh
index c3b9d77..1430164 100644
--- a/centos.sh
+++ b/centos.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-verify_enable() {
+verify_enable_platform() {
feature="$1"
feature_status=${!1}
if [ "$OS_MAJOR" = "6" ]; then
diff --git a/darwin.sh b/darwin.sh
index da5df01..0e6bd81 100644
--- a/darwin.sh
+++ b/darwin.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-verify_enable() {
+verify_enable_platform() {
feature="$1"
feature_status=${!1}
if [ "$feature" = "BUSTACHE_ENABLED" ]; then
diff --git a/debian.sh b/debian.sh
index bb68ef0..9b67d7d 100644
--- a/debian.sh
+++ b/debian.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-verify_enable(){
+verify_enable_platform(){
feature="$1"
feature_status=${!1}
verify_gcc_enable $feature
diff --git a/extensions/sql/CMakeLists.txt b/extensions/sql/CMakeLists.txt
new file mode 100644
index 0000000..ead2726
--- /dev/null
+++ b/extensions/sql/CMakeLists.txt
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+set(CMAKE_CXX_STANDARD 14)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+include_directories(../../thirdparty/rapidjson-1.1.0/include/ ../../thirdparty/rapidjson-1.1.0/include/rapidjson)
+include_directories(".")
+
+file(GLOB SOURCES "*.cpp" "services/*.cpp" "processors/*.cpp" "data/*.cpp")
+
+add_library(minifi-sql STATIC ${SOURCES})
+set_property(TARGET minifi-sql PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+if(THREADS_HAVE_PTHREAD_ARG)
+ target_compile_options(PUBLIC minifi-sql "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+ target_link_libraries(minifi-sql "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+# Get whether we should use lib64/ library paths
+get_property(LIB64 GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS)
+
+if(WIN32)
+ find_package(ODBC REQUIRED)
+else()
+ # Build iODBC
+
+ # Define byproducts
+ if(LIB64 AND NOT APPLE)
+ set(IODBC_BYPRODUCT "lib64/libiodbc.a")
+ else()
+ set(IODBC_BYPRODUCT "lib/libiodbc.a")
+ endif()
+
+ set(IODBC_BYPRODUCT_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/iodbc-install/")
+
+ # Build project
+ ExternalProject_Add(
+ iodbc-external
+ URL "https://github.com/openlink/iODBC/archive/v3.52.13.tar.gz"
+ URL_HASH "SHA256=4bf67fc6d4d237a4db19b292b5dd255ee09a0b2daa4e4058cf3a918bc5102135"
+ BUILD_IN_SOURCE true
+ SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/iodbc-src"
+ BUILD_COMMAND make
+ CMAKE_COMMAND ""
+ UPDATE_COMMAND ""
+ INSTALL_COMMAND make install
+ CONFIGURE_COMMAND ""
+ PATCH_COMMAND ./autogen.sh && ./configure --prefix=${IODBC_BYPRODUCT_DIR}
+ STEP_TARGETS build
+ BUILD_BYPRODUCTS "${IODBC_BYPRODUCT_DIR}/${IODBC_BYPRODUCT}"
+ EXCLUDE_FROM_ALL TRUE
+ )
+
+ # Set variables
+ set(IODBC_FOUND "YES" CACHE STRING "" FORCE)
+ set(IODBC_INCLUDE_DIRS "${IODBC_BYPRODUCT_DIR}/include" CACHE STRING "" FORCE)
+ set(IODBC_LIBRARIES "${IODBC_BYPRODUCT_DIR}/${IODBC_BYPRODUCT}" CACHE STRING "" FORCE)
+
+ # Set exported variables for FindPackage.cmake
+ set(EXPORTED_IODBC_INCLUDE_DIRS "${IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+ set(EXPORTED_IODBC_LIBRARIES "${IODBC_LIBRARIES}" CACHE STRING "" FORCE)
+
+ # Create imported targets
+ add_library(ODBC::ODBC STATIC IMPORTED)
+ set_target_properties(ODBC::ODBC PROPERTIES IMPORTED_LOCATION "${IODBC_LIBRARIES}")
+ add_dependencies(ODBC::ODBC iodbc-external)
+ file(MAKE_DIRECTORY ${IODBC_INCLUDE_DIRS})
+ set_property(TARGET ODBC::ODBC APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${IODBC_INCLUDE_DIRS})
+endif()
+
+# Build SOCI
+
+# Find patch executable
+find_package(Patch)
+
+# Define patch step
+set(PC "${Patch_EXECUTABLE}" -p1 -i "${CMAKE_CURRENT_SOURCE_DIR}/patch/soci.patch")
+
+# Define byproducts
+if(NOT APPLE AND CMAKE_SIZEOF_VOID_P EQUAL 8)
+ set(LIBSUFFIX 64)
+endif()
+
+if (WIN32)
+ set(BYPRODUCT_SUFFIX "_4_0.lib")
+else()
+ set(BYPRODUCT_SUFFIX ".a")
+endif()
+
+set(SOCI_BYPRODUCTS
+ "lib${LIBSUFFIX}/libsoci_core${BYPRODUCT_SUFFIX}"
+ "lib${LIBSUFFIX}/libsoci_odbc${BYPRODUCT_SUFFIX}"
+ )
+
+set(SOCI_BYPRODUCT_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/soci-install")
+
+foreach(SOCI_BYPRODUCT ${SOCI_BYPRODUCTS})
+ list(APPEND SOCI_LIBRARIES_LIST "${SOCI_BYPRODUCT_DIR}/${SOCI_BYPRODUCT}")
+endforeach(SOCI_BYPRODUCT)
+
+# Set build options
+set(SOCI_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+ "-DCMAKE_INSTALL_PREFIX=${SOCI_BYPRODUCT_DIR}"
+ "-DSOCI_TESTS=OFF"
+ "-DSOCI_SHARED=OFF"
+ "-DSOCI_CXX_C11=ON"
+ "-DWITH_ODBC=ON"
+ "-DSOCI_ODBC=ON"
+ "-DWITH_BOOST=OFF")
+
+if(NOT WIN32)
+ list(APPEND SOCI_CMAKE_ARGS "-DCMAKE_MODULE_PATH=${CMAKE_CURRENT_SOURCE_DIR}/cmake/"
+ "-DEXPORTED_IODBC_INCLUDE_DIRS=${EXPORTED_IODBC_INCLUDE_DIRS}"
+ "-DEXPORTED_IODBC_LIBRARIES=${EXPORTED_IODBC_LIBRARIES}")
+endif()
+
+# Build project
+ExternalProject_Add(
+ soci-external
+ URL "https://github.com/SOCI/soci/archive/4.0.0.tar.gz"
+ URL_HASH "SHA256=359b988d8cbe81357835317821919f7e270c0705e41951a92ac1627cb9fe8faf"
+ PATCH_COMMAND ${PC}
+ SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/soci-src"
+ CMAKE_ARGS ${SOCI_CMAKE_ARGS}
+ BUILD_BYPRODUCTS ${SOCI_LIBRARIES_LIST}
+ EXCLUDE_FROM_ALL TRUE
+)
+
+# Set dependencies
+if(NOT WIN32)
+ add_dependencies(soci-external ODBC::ODBC)
+endif()
+
+# Set variables
+set(SOCI_FOUND "YES" CACHE STRING "" FORCE)
+set(SOCI_INCLUDE_DIR "${SOCI_BYPRODUCT_DIR}/include" CACHE STRING "" FORCE)
+set(SOCI_LIBRARIES "${SOCI_LIBRARIES_LIST}" CACHE STRING "" FORCE)
+
+# Create imported targets
+file(MAKE_DIRECTORY ${SOCI_INCLUDE_DIR})
+
+add_library(SOCI::libsoci_core STATIC IMPORTED)
+set_target_properties(SOCI::libsoci_core PROPERTIES IMPORTED_LOCATION "${SOCI_BYPRODUCT_DIR}/lib${LIBSUFFIX}/libsoci_core${BYPRODUCT_SUFFIX}")
+set_target_properties(SOCI::libsoci_core PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${SOCI_INCLUDE_DIR}")
+add_dependencies(SOCI::libsoci_core soci-external)
+
+add_library(SOCI::libsoci_odbc STATIC IMPORTED)
+set_target_properties(SOCI::libsoci_odbc PROPERTIES IMPORTED_LOCATION "${SOCI_BYPRODUCT_DIR}/lib${LIBSUFFIX}/libsoci_odbc${BYPRODUCT_SUFFIX}")
+set_target_properties(SOCI::libsoci_odbc PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${SOCI_INCLUDE_DIR}")
+add_dependencies(SOCI::libsoci_odbc soci-external)
+set_property(TARGET SOCI::libsoci_odbc APPEND PROPERTY INTERFACE_LINK_LIBRARIES SOCI::libsoci_core)
+set_property(TARGET SOCI::libsoci_odbc APPEND PROPERTY INTERFACE_LINK_LIBRARIES ODBC::ODBC)
+
+target_link_libraries(minifi-sql SOCI::libsoci_odbc SOCI::libsoci_core)
+
+target_link_libraries(minifi-sql ${LIBMINIFI})
+
+SET (SQL-EXTENSION minifi-sql PARENT_SCOPE)
+register_extension(minifi-sql)
diff --git a/extensions/sql/SQLLoader.cpp b/extensions/sql/SQLLoader.cpp
new file mode 100644
index 0000000..2a787c2
--- /dev/null
+++ b/extensions/sql/SQLLoader.cpp
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/FlowConfiguration.h"
+#include "SQLLoader.h"
+
+static auto added = core::FlowConfiguration::add_static_func("createSQLFactory");
+
+extern "C" {
+
+void *createSQLFactory(void) {
+ return new SQLFactory();
+}
+
+}
diff --git a/extensions/sql/SQLLoader.h b/extensions/sql/SQLLoader.h
new file mode 100644
index 0000000..992c802
--- /dev/null
+++ b/extensions/sql/SQLLoader.h
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+ SQLFactory() {}
+
+ /**
+ * Gets the name of the object.
+ * @return class name of processor
+ */
+ std::string getName() override {
+ return "SQLFactory";
+ }
+
+ std::string getClassName() override{
+ return "SQLFactory";
+ }
+ /**
+ * Gets the class name for the object
+ * @return class name for the processor.
+ */
+ std::vector<std::string> getClassNames() override{
+ return {"ExecuteSQL", "PutSQL", "QueryDatabaseTable", "ODBCService"};
+ }
+
+ template <typename T>
+ static std::unique_ptr<ObjectFactory> getObjectFactory() {
+ return std::make_unique<core::DefautObjectFactory<T>>();
+ }
+
+ std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override {
+ if (utils::StringUtils::equalsIgnoreCase(class_name, "ExecuteSQL")) {
+ return getObjectFactory<minifi::processors::ExecuteSQL>();
+ }
+ if (utils::StringUtils::equalsIgnoreCase(class_name, "PutSQL")) {
+ return getObjectFactory<minifi::processors::PutSQL>();
+ }
+ if (utils::StringUtils::equalsIgnoreCase(class_name, "QueryDatabaseTable")) {
+ return getObjectFactory<minifi::processors::QueryDatabaseTable>();
+ }
+ if (utils::StringUtils::equalsIgnoreCase(class_name, "ODBCService")) {
+ return getObjectFactory<minifi::sql::controllers::ODBCService>();
+ }
+
+ return nullptr;
+ }
+};
+
+extern "C" {
+ DLL_EXPORT void *createSQLFactory(void);
+}
diff --git a/extensions/sql/cmake/FindODBC.cmake b/extensions/sql/cmake/FindODBC.cmake
new file mode 100644
index 0000000..b232e76
--- /dev/null
+++ b/extensions/sql/cmake/FindODBC.cmake
@@ -0,0 +1,34 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+if (NOT ODBC_FOUND)
+ set(ODBC_FOUND "YES" CACHE STRING "" FORCE)
+ set(ODBC_INCLUDE_DIR "${EXPORTED_IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+ set(ODBC_INCLUDE_DIRS "${EXPORTED_IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+ set(ODBC_LIBRARIES "${EXPORTED_IODBC_LIBRARIES}" CACHE STRING "" FORCE)
+ set(ODBC_LIBRARY "${EXPORTED_IODBC_LIBRARIES}" CACHE STRING "" FORCE)
+endif()
+
+if(NOT TARGET ODBC::ODBC)
+ add_library(ODBC::ODBC STATIC IMPORTED)
+ set_target_properties(ODBC::ODBC PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${ODBC_INCLUDE_DIRS}")
+ set_target_properties(ODBC::ODBC PROPERTIES
+ IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+ IMPORTED_LOCATION "${ODBC_LIBRARIES}")
+endif()
\ No newline at end of file
diff --git a/extensions/sql/data/DatabaseConnectors.h b/extensions/sql/data/DatabaseConnectors.h
new file mode 100644
index 0000000..0e94b73
--- /dev/null
+++ b/extensions/sql/data/DatabaseConnectors.h
@@ -0,0 +1,102 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include <soci/soci.h>
+
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+/**
+ * We do not intend to create an abstract facade here. We know that SOCI is the underlying
+ * SQL library. We only wish to abstract ODBC specific information
+ */
+
+class Statement {
+ public:
+
+ explicit Statement(soci::session& session, const std::string &query)
+ : session_(session), query_(query) {
+ }
+
+ virtual ~Statement() {
+ }
+
+ soci::rowset<soci::row> execute() {
+ return session_.prepare << query_;
+ }
+
+ protected:
+ std::string query_;
+ soci::session& session_;
+};
+
+class Session {
+ public:
+
+ explicit Session(soci::session& session)
+ : session_(session) {
+ }
+
+ virtual ~Session() {
+ }
+
+ void begin() {
+ session_.begin();
+ }
+
+ void commit() {
+ session_.commit();
+ }
+
+ void rollback() {
+ session_.rollback();
+ }
+
+ void execute(const std::string &statement) {
+ session_ << statement;
+ }
+
+protected:
+ soci::session& session_;
+};
+
+class Connection {
+ public:
+ virtual ~Connection() {
+ }
+ virtual bool connected(std::string& exception) const = 0;
+ virtual std::unique_ptr<Statement> prepareStatement(const std::string &query) const = 0;
+ virtual std::unique_ptr<Session> getSession() const = 0;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/JSONSQLWriter.cpp b/extensions/sql/data/JSONSQLWriter.cpp
new file mode 100644
index 0000000..514fd4d
--- /dev/null
+++ b/extensions/sql/data/JSONSQLWriter.cpp
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "JSONSQLWriter.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/prettywriter.h"
+#include "Exception.h"
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+JSONSQLWriter::JSONSQLWriter(bool pretty)
+ : pretty_(pretty), jsonPayload_(rapidjson::kArrayType) {
+}
+
+JSONSQLWriter::~JSONSQLWriter() {}
+
+void JSONSQLWriter::beginProcessRow() {
+ jsonRow_ = rapidjson::kObjectType;
+}
+
+void JSONSQLWriter::endProcessRow() {
+ jsonPayload_.PushBack(jsonRow_, jsonPayload_.GetAllocator());
+}
+
+void JSONSQLWriter::processColumnName(const std::string& name) {}
+
+void JSONSQLWriter::processColumn(const std::string& name, const std::string& value) {
+ addToJSONRow(name, std::move(toJSONString(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, double value) {
+ addToJSONRow(name, std::move(rapidjson::Value().SetDouble(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, int value) {
+ addToJSONRow(name, std::move(rapidjson::Value().SetInt(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, long long value) {
+ addToJSONRow(name, std::move(rapidjson::Value().SetInt64(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, unsigned long long value) {
+ addToJSONRow(name, std::move(rapidjson::Value().SetUint64(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, const char* value) {
+ addToJSONRow(name, std::move(toJSONString(value)));
+}
+
+void JSONSQLWriter::addToJSONRow(const std::string& columnName, rapidjson::Value&& jsonValue) {
+ jsonRow_.AddMember(toJSONString(columnName), std::move(jsonValue), jsonPayload_.GetAllocator());
+}
+
+rapidjson::Value JSONSQLWriter::toJSONString(const std::string& s) {
+ rapidjson::Value jsonValue;
+ jsonValue.SetString(s.c_str(), s.size(), jsonPayload_.GetAllocator());
+
+ return jsonValue;
+}
+
+std::string JSONSQLWriter::toString() {
+ rapidjson::StringBuffer buffer;
+
+ if (pretty_) {
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+ jsonPayload_.Accept(writer);
+ } else {
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ jsonPayload_.Accept(writer);
+ }
+
+ std::stringstream outputStream;
+ outputStream << buffer.GetString();
+
+ jsonPayload_ = rapidjson::Document(rapidjson::kArrayType);
+
+ return outputStream.str();
+}
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/JSONSQLWriter.h b/extensions/sql/data/JSONSQLWriter.h
new file mode 100644
index 0000000..6888527
--- /dev/null
+++ b/extensions/sql/data/JSONSQLWriter.h
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "rapidjson/document.h"
+
+#include "SQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class JSONSQLWriter: public SQLWriter {
+ public:
+ explicit JSONSQLWriter(bool pretty);
+ virtual ~JSONSQLWriter();
+
+ std::string toString() override;
+
+private:
+ void beginProcessRow() override;
+ void endProcessRow() override;
+ void processColumnName(const std::string& name) override;
+ void processColumn(const std::string& name, const std::string& value) override;
+ void processColumn(const std::string& name, double value) override;
+ void processColumn(const std::string& name, int value) override;
+ void processColumn(const std::string& name, long long value) override;
+ void processColumn(const std::string& name, unsigned long long value) override;
+ void processColumn(const std::string& name, const char* value) override;
+
+ void addToJSONRow(const std::string& columnName, rapidjson::Value&& jsonValue);
+
+ rapidjson::Value toJSONString(const std::string& s);
+
+ private:
+ bool pretty_;
+ rapidjson::Document jsonPayload_;
+ rapidjson::Value jsonRow_;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
diff --git a/extensions/sql/data/MaxCollector.h b/extensions/sql/data/MaxCollector.h
new file mode 100644
index 0000000..359f962
--- /dev/null
+++ b/extensions/sql/data/MaxCollector.h
@@ -0,0 +1,155 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <tuple>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class MaxCollector: public SQLRowSubscriber {
+ void beginProcessRow() override {}
+
+ void endProcessRow() override {
+ if (columnsVerified_) {
+ return;
+ }
+
+ if (countColumns_ != mapState_.size())
+ throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
+
+ columnsVerified_ = true;
+ }
+
+ void processColumnName(const std::string& name) override {
+ if (columnsVerified_) {
+ return;
+ }
+
+ if (mapState_.count(name)) {
+ countColumns_++;
+ }
+ }
+
+ void processColumn(const std::string& name, const std::string& value) override {
+ updateMaxValue(name, '\'' + value + '\'');
+ }
+
+ void processColumn(const std::string& name, double value) override {
+ updateMaxValue(name, value);
+ }
+
+ void processColumn(const std::string& name, int value) override {
+ updateMaxValue(name, value);
+ }
+
+ void processColumn(const std::string& name, long long value) override {
+ updateMaxValue(name, value);
+ }
+
+ void processColumn(const std::string& name, unsigned long long value) override {
+ updateMaxValue(name, value);
+ }
+
+ void processColumn(const std::string& name, const char* value) override {}
+
+ template <typename T>
+ struct MaxValue {
+ void updateMaxValue(const std::string& name, const T& value) {
+ const auto it = mapColumnNameValue_.find(name);
+ if (it == mapColumnNameValue_.end()) {
+ mapColumnNameValue_.insert({ name, value });
+ } else {
+ if (value > it->second) {
+ it->second = value;
+ }
+ }
+ }
+
+ std::unordered_map<std::string, T> mapColumnNameValue_;
+ };
+
+ template <typename Tuple, int Index>
+ struct UpdateMapState {
+ UpdateMapState(const Tuple& tpl, std::unordered_map<std::string, std::string>& mapState) {
+ for (auto& el : mapState) {
+ const auto& maxVal = std::get<Index>(tpl);
+
+ const auto it = maxVal.mapColumnNameValue_.find(el.first);
+ if (it != maxVal.mapColumnNameValue_.end()) {
+ std::stringstream ss;
+ ss << it->second;
+ el.second = ss.str();
+ }
+ }
+
+ UpdateMapState<Tuple, Index - 1>(tpl, mapState);
+ }
+ };
+
+ template <typename Tuple>
+ struct UpdateMapState<Tuple, -1> {
+ UpdateMapState(const Tuple&, std::unordered_map<std::string, std::string>&) {}
+ };
+
+ template <typename ...Ts>
+ struct MaxValues : public std::tuple<MaxValue<Ts>...> {
+ constexpr static size_t size = sizeof...(Ts);
+ };
+
+ public:
+ MaxCollector(const std::string& selectQuery, const std::string& maxValueColumnNames, std::unordered_map<std::string, std::string>& mapState)
+ :selectQuery_(selectQuery), maxValueColumnNames_(maxValueColumnNames), mapState_(mapState) {
+ }
+
+ template <typename T>
+ void updateMaxValue(const std::string& columnName, const T& value) {
+ if (mapState_.count(columnName)) {
+ std::get<MaxValue<T>>(maxValues_).updateMaxValue(columnName, value);
+ }
+ }
+
+ bool updateMapState() {
+ auto mapState = mapState_;
+ UpdateMapState<decltype(maxValues_), decltype(maxValues_)::size - 1>(maxValues_, mapState_);
+
+ return mapState != mapState_;
+ }
+
+ private:
+ const std::string selectQuery_;
+ const std::string maxValueColumnNames_;
+ std::unordered_map<std::string, std::string>& mapState_;
+ MaxValues<std::string, double, int, long long, unsigned long long> maxValues_;
+ size_t countColumns_{};
+ bool columnsVerified_{false};
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/SQLRowSubscriber.h b/extensions/sql/data/SQLRowSubscriber.h
new file mode 100644
index 0000000..5325e71
--- /dev/null
+++ b/extensions/sql/data/SQLRowSubscriber.h
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+struct SQLRowSubscriber {
+ virtual ~SQLRowSubscriber() {}
+ virtual void beginProcessRow() = 0;
+ virtual void endProcessRow() = 0;
+ virtual void processColumnName(const std::string& name) = 0;
+ virtual void processColumn(const std::string& name, const std::string& value) = 0;
+ virtual void processColumn(const std::string& name, double value) = 0;
+ virtual void processColumn(const std::string& name, int value) = 0;
+ virtual void processColumn(const std::string& name, long long value) = 0;
+ virtual void processColumn(const std::string& name, unsigned long long value) = 0;
+ // Process NULL value.
+ virtual void processColumn(const std::string& name, const char* value) = 0;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/SQLRowsetProcessor.cpp b/extensions/sql/data/SQLRowsetProcessor.cpp
new file mode 100644
index 0000000..1339b9d
--- /dev/null
+++ b/extensions/sql/data/SQLRowsetProcessor.cpp
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SQLRowsetProcessor.h"
+
+#include "Exception.h"
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers)
+ : rowset_(rowset), rowSubscribers_(rowSubscribers) {
+ iter_ = rowset_.begin();
+}
+
+size_t SQLRowsetProcessor::process(size_t max) {
+ size_t count = 0;
+
+ for (; iter_ != rowset_.end(); ) {
+ addRow(*iter_, count);
+ iter_++;
+ count++;
+ totalCount_++;
+ if (max > 0 && count >= max) {
+ break;
+ }
+ }
+
+ return count;
+}
+
+void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
+ for (const auto& pRowSubscriber : rowSubscribers_) {
+ pRowSubscriber->beginProcessRow();
+ }
+
+ if (rowCount == 0) {
+ for (std::size_t i = 0; i != row.size(); ++i) {
+ for (const auto& pRowSubscriber : rowSubscribers_) {
+ pRowSubscriber->processColumnName(utils::toLower(row.get_properties(i).get_name()));
+ }
+ }
+ }
+
+ for (std::size_t i = 0; i != row.size(); ++i) {
+ const soci::column_properties& props = row.get_properties(i);
+
+ const auto& name = utils::toLower(props.get_name());
+
+ if (row.get_indicator(i) == soci::i_null) {
+ processColumn(name, "NULL");
+ } else {
+ switch (const auto dataType = props.get_data_type()) {
+ case soci::data_type::dt_string: {
+ processColumn(name, row.get<std::string>(i));
+ }
+ break;
+ case soci::data_type::dt_double: {
+ processColumn(name, row.get<double>(i));
+ }
+ break;
+ case soci::data_type::dt_integer: {
+ processColumn(name, row.get<int>(i));
+ }
+ break;
+ case soci::data_type::dt_long_long: {
+ processColumn(name, row.get<long long>(i));
+ }
+ break;
+ case soci::data_type::dt_unsigned_long_long: {
+ processColumn(name, row.get<unsigned long long>(i));
+ }
+ break;
+ case soci::data_type::dt_date: {
+ const std::tm when = row.get<std::tm>(i);
+
+ char value[128];
+ if (!std::strftime(value, sizeof(value), "%Y-%m-%d %H:%M:%S", &when))
+ throw minifi::Exception(PROCESSOR_EXCEPTION, "SQLRowsetProcessor: !strftime.");
+
+ processColumn(name, std::string(value));
+ }
+ break;
+ default: {
+ throw minifi::Exception(PROCESSOR_EXCEPTION, "SQLRowsetProcessor: Unsupported data type " + std::to_string(dataType));
+ }
+ }
+ }
+ }
+
+ for (const auto& pRowSubscriber : rowSubscribers_) {
+ pRowSubscriber->endProcessRow();
+ }
+}
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/SQLRowsetProcessor.h b/extensions/sql/data/SQLRowsetProcessor.h
new file mode 100644
index 0000000..27545f4
--- /dev/null
+++ b/extensions/sql/data/SQLRowsetProcessor.h
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include <soci/soci.h>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class SQLRowsetProcessor
+{
+ public:
+ SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers);
+
+ size_t process(size_t max);
+
+ private:
+ void addRow(const soci::row& row, size_t rowCount);
+
+ template <typename T>
+ void processColumn(const std::string& name, const T& value) const {
+ for (const auto& pRowSubscriber: rowSubscribers_) {
+ pRowSubscriber->processColumn(name, value);
+ }
+ }
+
+ private:
+ size_t totalCount_{};
+ soci::rowset<soci::row>::const_iterator iter_;
+ soci::rowset<soci::row> rowset_;
+ std::vector<SQLRowSubscriber*> rowSubscribers_;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/SQLWriter.h b/extensions/sql/data/SQLWriter.h
new file mode 100644
index 0000000..f8627e9
--- /dev/null
+++ b/extensions/sql/data/SQLWriter.h
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <iostream>
+
+#include <soci/soci.h>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+struct SQLWriter: public SQLRowSubscriber
+{
+ virtual std::string toString() = 0;
+};
+
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/Utils.cpp b/extensions/sql/data/Utils.cpp
new file mode 100644
index 0000000..f1d8422
--- /dev/null
+++ b/extensions/sql/data/Utils.cpp
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Utils.h"
+
+#include <algorithm>
+#include <cctype>
+#include <regex>
+#include <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str) {
+ std::string ret;
+
+ // (int(*)(int))std::tolower - to avoid compilation error 'no matching overloaded function found'.
+ // It is described in https://stackoverflow.com/questions/5539249/why-cant-transforms-begin-s-end-s-begin-tolower-be-complied-successfu.
+ std::transform(str.begin(), str.end(), std::back_inserter(ret), (int(*)(int))std::tolower);
+
+ return ret;
+}
+
+std::vector<std::string> inputStringToList(const std::string& str) {
+ std::vector<std::string> ret;
+
+ std::string token;
+ // Convert to lower and remove white characters.
+ std::istringstream tokenStream(std::regex_replace(toLower(str), std::regex("\\s"), std::string("")));
+
+ while (std::getline(tokenStream, token, ','))
+ {
+ ret.push_back(token);
+ }
+
+ return ret;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/Utils.h b/extensions/sql/data/Utils.h
new file mode 100644
index 0000000..fb9758f
--- /dev/null
+++ b/extensions/sql/data/Utils.h
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str);
+std::vector<std::string> inputStringToList(const std::string& str);
+
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/WriteCallback.h b/extensions/sql/data/WriteCallback.h
new file mode 100644
index 0000000..ab961ad
--- /dev/null
+++ b/extensions/sql/data/WriteCallback.h
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+ explicit WriteCallback(const std::string& data)
+ : data_(data) {
+ }
+
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ if (data_.empty())
+ return 0;
+
+ return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
+ }
+
+ const std::string& data_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/patch/soci.patch b/extensions/sql/patch/soci.patch
new file mode 100644
index 0000000..530571d
--- /dev/null
+++ b/extensions/sql/patch/soci.patch
@@ -0,0 +1,18 @@
+diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
+--- orig/CMakeLists.txt 2019-11-09 20:08:01.000000000 +0100
++++ patched/CMakeLists.txt 2019-12-17 15:22:40.000000000 +0100
+@@ -29,8 +29,7 @@ option(SOCI_ASAN "Enable address sanitiz
+ ###############################################################################
+
+ # Path to additional CMake modules
+-set(CMAKE_MODULE_PATH ${SOCI_SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH})
+-set(CMAKE_MODULE_PATH ${SOCI_SOURCE_DIR}/cmake/modules ${CMAKE_MODULE_PATH})
++list(APPEND CMAKE_MODULE_PATH "${SOCI_SOURCE_DIR}/cmake/modules" "${SOCI_SOURCE_DIR}/cmake")
+
+ include(SociUtilities)
+ include(SociConfig)
+@@ -204,4 +203,3 @@ endforeach()
+ configure_file("${CONFIG_FILE_IN}" "${CONFIG_FILE_OUT}")
+
+ message(STATUS "")
+-
diff --git a/extensions/sql/processors/ExecuteSQL.cpp b/extensions/sql/processors/ExecuteSQL.cpp
new file mode 100644
index 0000000..7738408
--- /dev/null
+++ b/extensions/sql/processors/ExecuteSQL.cpp
@@ -0,0 +1,123 @@
+/**
+ * @file ExecuteSQL.cpp
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
+
+const core::Property ExecuteSQL::s_sqlSelectQuery(
+ core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+ "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
+ "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+ "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
+ "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+
+const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
+ core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+ "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+
+const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultRowCount = "executesql.row.count";
+
+ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
+ : SQLProcessor(name, uuid), max_rows_(0) {
+}
+
+ExecuteSQL::~ExecuteSQL() {
+}
+
+void ExecuteSQL::initialize() {
+ //! Set the supported properties
+ setSupportedProperties( { dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+
+ //! Set the supported relationships
+ setSupportedRelationships( { s_success });
+}
+
+void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+ initOutputFormat(context);
+
+ context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
+ context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+}
+
+void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+ auto statement = connection_->prepareStatement(sqlSelectQuery_);
+
+ auto rowset = statement->execute();
+
+ int count = 0;
+ size_t rowCount = 0;
+ sql::JSONSQLWriter sqlWriter(isJSONPretty());
+ sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+
+ // Process rowset.
+ do {
+ rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
+ count++;
+ if (rowCount == 0)
+ break;
+
+ const auto output = sqlWriter.toString();
+ if (!output.empty()) {
+ WriteCallback writer(output);
+ auto newflow = session.create();
+ newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
+ session.write(newflow, &writer);
+ session.transfer(newflow, s_success);
+ }
+ } while (rowCount > 0);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/ExecuteSQL.h b/extensions/sql/processors/ExecuteSQL.h
new file mode 100644
index 0000000..764f038
--- /dev/null
+++ b/extensions/sql/processors/ExecuteSQL.h
@@ -0,0 +1,70 @@
+/**
+ * @file ExecuteSQL.h
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "services/DatabaseService.h"
+#include "SQLProcessor.h"
+#include "OutputFormat.h"
+
+#include <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+//! ExecuteSQL Class
+class ExecuteSQL: public SQLProcessor<ExecuteSQL>, public OutputFormat {
+ public:
+ explicit ExecuteSQL(const std::string& name, utils::Identifier uuid = utils::Identifier());
+ virtual ~ExecuteSQL();
+
+ //! Processor Name
+ static const std::string ProcessorName;
+
+ void processOnSchedule(const core::ProcessContext& context);
+ void processOnTrigger(core::ProcessSession& session);
+
+ void initialize() override;
+
+ static const core::Property s_sqlSelectQuery;
+ static const core::Property s_maxRowsPerFlowFile;
+
+ static const core::Relationship s_success;
+
+ private:
+ int max_rows_;
+ std::string sqlSelectQuery_;
+};
+
+REGISTER_RESOURCE(ExecuteSQL, "ExecuteSQL to execute SELECT statement via ODBC.");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/OutputFormat.cpp b/extensions/sql/processors/OutputFormat.cpp
new file mode 100644
index 0000000..acd9f69
--- /dev/null
+++ b/extensions/sql/processors/OutputFormat.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OutputFormat.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string s_outputFormatJSON = "JSON";
+const std::string s_outputFormatJSONPretty = "JSON-Pretty";
+
+const core::Property& OutputFormat::outputFormat() {
+ static const core::Property s_outputFormat =
+ core::PropertyBuilder::createProperty("Output Format")->
+ isRequired(true)->
+ withDefaultValue(s_outputFormatJSONPretty)->
+ withAllowableValues<std::string>({ s_outputFormatJSON, s_outputFormatJSONPretty })->
+ withDescription("Set the output format type.")->
+ build();
+
+ return s_outputFormat;
+}
+
+bool OutputFormat::isJSONFormat() const {
+ return outputFormat_ == s_outputFormatJSON || outputFormat_ == s_outputFormatJSONPretty;
+}
+
+bool OutputFormat::isJSONPretty() const {
+ return outputFormat_ == s_outputFormatJSONPretty;
+}
+
+void OutputFormat::initOutputFormat(const core::ProcessContext& context) {
+ context.getProperty(outputFormat().getName(), outputFormat_);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/OutputFormat.h b/extensions/sql/processors/OutputFormat.h
new file mode 100644
index 0000000..e94b538
--- /dev/null
+++ b/extensions/sql/processors/OutputFormat.h
@@ -0,0 +1,52 @@
+/**
+ * @file OutputFormat.h
+ * OutputFormat class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "core/Processor.h"
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class OutputFormat {
+ protected:
+ static const core::Property& outputFormat();
+
+ bool isJSONFormat() const;
+
+ bool isJSONPretty() const;
+
+ void initOutputFormat(const core::ProcessContext& context);
+
+ protected:
+ std::string outputFormat_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
new file mode 100644
index 0000000..0512cc9
--- /dev/null
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -0,0 +1,102 @@
+/**
+ * @file PutSQL.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string PutSQL::ProcessorName("PutSQL");
+
+const core::Property PutSQL::s_sqlStatements(
+ core::PropertyBuilder::createProperty("SQL statements")->isRequired(true)->withDefaultValue("System")->withDescription(
+ "A semicolon-delimited list of SQL statements to execute. The statement can be empty, a constant value, or built from attributes using Expression Language. "
+ "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+ "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL statements, to be issued by the processor to the database.")
+ ->supportsExpressionLanguage(true)->build());
+
+const core::Relationship PutSQL::s_success("success", "Database is successfully updated.");
+
+PutSQL::PutSQL(const std::string& name, utils::Identifier uuid)
+ : SQLProcessor(name, uuid) {
+}
+
+PutSQL::~PutSQL() {
+}
+
+void PutSQL::initialize() {
+ //! Set the supported properties
+ setSupportedProperties( { dbControllerService(), s_sqlStatements });
+
+ //! Set the supported relationships
+ setSupportedRelationships( { s_success });
+}
+
+void PutSQL::processOnSchedule(const core::ProcessContext& context) {
+ std::string sqlStatements;
+ context.getProperty(s_sqlStatements.getName(), sqlStatements);
+ sqlStatements_ = utils::StringUtils::split(sqlStatements, ";");
+}
+
+void PutSQL::processOnTrigger(core::ProcessSession& session) {
+ const auto dbSession = connection_->getSession();
+
+ try {
+ dbSession->begin();
+ for (const auto& statement : sqlStatements_) {
+ dbSession->execute(statement);
+ }
+ dbSession->commit();
+ } catch (std::exception& e) {
+ logger_->log_error("SQL statement error: %s", e.what());
+ dbSession->rollback();
+ throw;
+ }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/PutSQL.h b/extensions/sql/processors/PutSQL.h
new file mode 100644
index 0000000..284008a
--- /dev/null
+++ b/extensions/sql/processors/PutSQL.h
@@ -0,0 +1,67 @@
+/**
+ * @file PutSQL.h
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "services/DatabaseService.h"
+#include "SQLProcessor.h"
+
+#include <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+//! PutSQL Class
+class PutSQL: public SQLProcessor<PutSQL> {
+ public:
+ explicit PutSQL(const std::string& name, utils::Identifier uuid = utils::Identifier());
+ virtual ~PutSQL();
+
+ //! Processor Name
+ static const std::string ProcessorName;
+
+ void processOnSchedule(const core::ProcessContext &context);
+ void processOnTrigger(core::ProcessSession &session);
+
+ void initialize() override;
+
+ static const core::Property s_sqlStatements;
+
+ static const core::Relationship s_success;
+
+ private:
+ std::vector<std::string> sqlStatements_;
+};
+
+REGISTER_RESOURCE(PutSQL, "PutSQL to execute SQL command via ODBC.");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/QueryDatabaseTable.cpp b/extensions/sql/processors/QueryDatabaseTable.cpp
new file mode 100644
index 0000000..a7d7033
--- /dev/null
+++ b/extensions/sql/processors/QueryDatabaseTable.cpp
@@ -0,0 +1,471 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+ core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+ core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+ "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+ "If no column names are supplied, all columns in the specified table will be returned. "
+ "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+ core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+ "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+ "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+ "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+ "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+ "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+ "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+ "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+ "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+ "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+ supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+ core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+ "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+ core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+ "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+ "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+ core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+ "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+ "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+ core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+ State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+ :tableName_(tableName), logger_(logger) {
+ if (!createUUIDDir(stateDir, uuid, filePath_))
+ return;
+
+ filePath_ += "State.txt";
+
+ if (!getStateFromFile())
+ return;
+
+ ok_ = true;
+ }
+
+ ~State() {}
+
+ explicit operator bool() const {
+ return ok_;
+ }
+
+ std::unordered_map<std::string, std::string> mapState() const {
+ return mapState_;
+ }
+
+ void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+ file_.seekp(std::ios::beg);
+
+ file_ << tableName_ << separator();
+ auto dataSize = tableName_.size() + separator().size();
+
+ for (const auto& el : mapState) {
+ file_ << el.first << '=' << el.second << separator();
+ dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+ }
+
+ // If a maxValueColumnName type is varchar, a new max value 'dataSize' can be shorter than previous max value 'dataSize_' - clear difference with ' ' to keep file format.
+ if (dataSize_ > dataSize) {
+ for (auto i = dataSize_ - dataSize; i > 0; i--) {
+ file_ << ' ';
+ }
+ }
+ dataSize_ = dataSize;
+
+ file_.flush();
+
+ mapState_ = mapState;
+ }
+
+ private:
+ static const std::string& separator() {
+ static const std::string s_separator = "@!qdt!@";
+ return s_separator;
+ }
+
+ bool createUUIDDir(const std::string& stateDir, const std::string& uuid, std::string& dir)
+ {
+ if (stateDir.empty()) {
+ dir.clear();
+ return false;
+ }
+
+ const auto dirSeparator = utils::file::FileUtils::get_separator();
+
+ auto dirWithSlash = stateDir;
+ if (stateDir.back() != dirSeparator) {
+ dirWithSlash += dirSeparator;
+ }
+
+ dir = dirWithSlash + "uuid" + dirSeparator + uuid + dirSeparator;
+
+ utils::file::FileUtils::create_dir(dir);
+
+ if (!utils::file::FileUtils::is_directory(dir.c_str())) {
+ logger_->log_error("Cannot create %s", dir.c_str());
+ dir.clear();
+ return false;
+ }
+
+ return true;
+ }
+
+ bool getStateFromFile() {
+ std::string state;
+
+ std::ifstream file(filePath_);
+ if (!file) {
+ return createEmptyStateFile();
+ }
+
+ std::stringstream ss;
+ ss << file.rdbuf();
+
+ state = ss.str();
+
+ dataSize_ = state.size();
+
+ file.close();
+
+ std::vector<std::string> listColumnNameValue;
+
+ size_t pos = state.find(separator(), 0);
+ if (pos == std::string::npos) {
+ logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
+ mapState_.clear();
+ return createEmptyStateFile();
+ }
+
+ auto tableName = state.substr(0, pos);
+ if (tableName != tableName_) {
+ logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
+ mapState_.clear();
+
+ return createEmptyStateFile();
+ }
+
+ pos += separator().size();
+
+ while (true) {
+ auto newPos = state.find(separator(), pos);
+ if (newPos == std::string::npos)
+ break;
+
+ const std::string& columnNameValue = state.substr(pos, newPos - pos);
+ listColumnNameValue.emplace_back(columnNameValue);
+
+ pos = newPos + separator().size();
+ }
+
+ for (const auto& columnNameValue : listColumnNameValue) {
+ const auto posEQ = columnNameValue.find('=');
+ if (posEQ == std::string::npos) {
+ logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
+ mapState_.clear();
+ return createEmptyStateFile();
+ }
+
+ const auto& name = columnNameValue.substr(0, posEQ);
+ const auto& value = columnNameValue.substr(posEQ + 1);
+
+ mapState_.insert({ name, value });
+ }
+
+ file_.open(filePath_);
+ if (!file_.is_open()) {
+ logger_->log_error("Cannot open %s", filePath_.c_str());
+ mapState_.clear();
+ return false;
+ }
+
+ return true;
+ }
+
+ bool createEmptyStateFile() {
+ file_.open(filePath_, std::ios::out);
+ if (!file_.is_open()) {
+ logger_->log_error("Cannot open '%s' file", filePath_.c_str());
+ return false;
+ }
+
+ dataSize_ = 0;
+
+ return true;
+ }
+
+ private:
+ std::unordered_map<std::string, std::string> mapState_;
+ std::shared_ptr<logging::Logger> logger_;
+ std::string filePath_;
+ std::fstream file_;
+ size_t dataSize_{};
+ std::string tableName_;
+ bool ok_{};
+};
+
+
+// QueryDatabaseTable
+QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
+ : SQLProcessor(name, uuid) {
+}
+
+QueryDatabaseTable::~QueryDatabaseTable() {
+}
+
+void QueryDatabaseTable::initialize() {
+ //! Set the supported properties
+ setSupportedProperties( { dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+
+ //! Set the supported relationships
+ setSupportedRelationships( { s_success });
+}
+
+void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context) {
+ initOutputFormat(context);
+
+ context.getProperty(s_tableName.getName(), tableName_);
+ context.getProperty(s_columnNames.getName(), columnNames_);
+
+ context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
+ listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+
+ context.getProperty(s_whereClause.getName(), whereClause_);
+ context.getProperty(s_sqlQuery.getName(), sqlQuery_);
+ context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+
+ std::string stateDir;
+ context.getProperty(s_stateDirectory.getName(), stateDir);
+ if (stateDir.empty()) {
+ logger_->log_error("State Directory is empty");
+ return;
+ }
+
+ pState_ = std::make_unique<State>(tableName_, stateDir, getUUIDStr(), logger_);
+ if (!*pState_) {
+ return;
+ }
+
+ mapState_ = pState_->mapState();
+
+ // If 'listMaxValueColumnName_' doesn't match columns in mapState_, then clear mapState_.
+ if (listMaxValueColumnName_.size() != mapState_.size()) {
+ mapState_.clear();
+ } else {
+ for (const auto& columName : listMaxValueColumnName_) {
+ if (0 == mapState_.count(columName)) {
+ mapState_.clear();
+ break;
+ }
+ }
+ }
+
+ // Add in 'mapState_' new columns which are in 'listMaxValueColumnName_'.
+ for (const auto& maxValueColumnName: listMaxValueColumnName_) {
+ if (0 == mapState_.count(maxValueColumnName)) {
+ mapState_.insert({maxValueColumnName, std::string()});
+ }
+ }
+
+ const auto dynamic_prop_keys = context.getDynamicPropertyKeys();
+ logger_->log_info("Received %zu dynamic properties", dynamic_prop_keys.size());
+
+ // If the stored state for a max value column is empty, populate it with the corresponding initial max value, if it exists.
+ for (const auto& key : dynamic_prop_keys) {
+ if (std::string::npos == key.rfind(s_initialMaxValueDynamicPropertyPrefix, 0)) {
+ throw minifi::Exception(PROCESSOR_EXCEPTION, "QueryDatabaseTable: Unsupported dynamic property \"" + key + "\"");
+ }
+ const auto columnName = utils::toLower(key.substr(s_initialMaxValueDynamicPropertyPrefix.length()));
+ auto it = mapState_.find(columnName);
+ if (it == mapState_.end()) {
+ logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", columnName);
+ continue;
+ }
+ if (!it->second.empty()) {
+ continue;
+ }
+ std::string value;
+ if (context.getDynamicProperty(key, value) && !value.empty()) {
+ it->second = value;
+ logger_->log_info("Setting initial maximum value of %s to %s", columnName, value);
+ }
+ }
+}
+
+void QueryDatabaseTable::processOnTrigger(core::ProcessSession &session) {
+ const auto& selectQuery = getSelectQuery();
+
+ logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
+
+ auto statement = connection_->prepareStatement(selectQuery);
+
+ auto rowset = statement->execute();
+
+ int count = 0;
+ size_t rowCount = 0;
+ sql::MaxCollector maxCollector(selectQuery, maxValueColumnNames_, mapState_);
+ sql::JSONSQLWriter sqlWriter(isJSONPretty());
+ sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {&sqlWriter, &maxCollector});
+
+ // Process rowset.
+ do {
+ rowCount = sqlRowsetProcessor.process(maxRowsPerFlowFile_ == 0 ? std::numeric_limits<size_t>::max() : maxRowsPerFlowFile_);
+ count++;
+ if (rowCount == 0)
+ break;
+
+ const auto output = sqlWriter.toString();
+ if (!output.empty()) {
+ WriteCallback writer(output);
+ auto newflow = session.create();
+ newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
+ newflow->addAttribute(ResultTableName, tableName_);
+ session.write(newflow, &writer);
+ session.transfer(newflow, s_success);
+ }
+ } while (rowCount > 0);
+
+ const auto mapState = mapState_;
+ if (maxCollector.updateMapState()) {
+ try {
+ session.commit();
+ } catch (std::exception& e) {
+ mapState_ = mapState;
+ throw;
+ }
+
+ pState_->writeStateToFile(mapState_);
+ }
+}
+
+std::string QueryDatabaseTable::getSelectQuery() {
+ std::string ret;
+
+ if (sqlQuery_.empty()) {
+ std::string columns;
+ if (columnNames_.empty()) {
+ columns = "*";
+ } else {
+ columns = columnNames_;
+ }
+ ret = "select " + columns + " from " + tableName_;
+ } else {
+ ret = sqlQuery_;
+ }
+
+ std::string whereClauses;
+
+ if (!mapState_.empty() && !listMaxValueColumnName_.empty()) {
+ for (auto index = 0U; index < listMaxValueColumnName_.size(); index++) {
+ const auto& columnName = listMaxValueColumnName_[index];
+
+ const auto itState = mapState_.find(columnName);
+
+ const auto& maxValue = itState->second;
+ if (maxValue.empty()) {
+ continue;
+ }
+
+ // Logic to differentiate ">" vs ">=" based on index is copied from:
+ // https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+ // (under comment "Add a condition for the WHERE clause"). And implementation explanation: https://issues.apache.org/jira/browse/NIFI-2712.
+ if (index == 0) {
+ whereClauses += columnName + " > ";
+ } else {
+ whereClauses += " and " + columnName + " >= ";
+ }
+ whereClauses += maxValue;
+ }
+ }
+
+ if (!whereClause_.empty()) {
+ whereClauses += " and " + whereClause_;
+ }
+
+ if (!whereClauses.empty()) {
+ ret += " where " + whereClauses;
+ }
+
+ return ret;
+}
+
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h
new file mode 100644
index 0000000..412faab
--- /dev/null
+++ b/extensions/sql/processors/QueryDatabaseTable.h
@@ -0,0 +1,95 @@
+/**
+ * @file QueryDatabaseTable.h
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "services/DatabaseService.h"
+#include "SQLProcessor.h"
+#include "OutputFormat.h"
+
+#include <sstream>
+#include <unordered_map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class State;
+
+//! QueryDatabaseTable Class
+class QueryDatabaseTable: public SQLProcessor<QueryDatabaseTable>, public OutputFormat {
+ public:
+ explicit QueryDatabaseTable(const std::string& name, utils::Identifier uuid = utils::Identifier());
+ virtual ~QueryDatabaseTable();
+
+ //! Processor Name
+ static const std::string ProcessorName;
+
+ static const core::Property s_tableName;
+ static const core::Property s_columnNames;
+ static const core::Property s_maxValueColumnNames;
+ static const core::Property s_whereClause;
+ static const core::Property s_sqlQuery;
+ static const core::Property s_maxRowsPerFlowFile;
+ static const core::Property s_stateDirectory;
+
+ static const std::string s_initialMaxValueDynamicPropertyPrefix;
+
+ static const core::Relationship s_success;
+
+ bool supportsDynamicProperties() override {
+ return true;
+ }
+
+ void processOnSchedule(const core::ProcessContext& context);
+ void processOnTrigger(core::ProcessSession& session);
+
+ void initialize() override;
+
+ private:
+ std::string getSelectQuery();
+
+ private:
+ std::string tableName_;
+ std::string columnNames_;
+ std::string maxValueColumnNames_;
+ std::string whereClause_;
+ std::string sqlQuery_;
+ int maxRowsPerFlowFile_{};
+ std::vector<std::string> listMaxValueColumnName_;
+ std::unordered_map<std::string, std::string> mapState_;
+ std::unordered_map<std::string, soci::data_type> mapColumnType_;
+ std::unique_ptr<State> pState_;
+};
+
+REGISTER_RESOURCE(QueryDatabaseTable, "QueryDatabaseTable to execute SELECT statement via ODBC.");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/SQLProcessor.h b/extensions/sql/processors/SQLProcessor.h
new file mode 100644
index 0000000..d277449
--- /dev/null
+++ b/extensions/sql/processors/SQLProcessor.h
@@ -0,0 +1,105 @@
+/**
+ * @file SQLProcessor.h
+ * SQLProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+template <typename T>
+class SQLProcessor: public core::Processor {
+ protected:
+ SQLProcessor(const std::string& name, utils::Identifier uuid)
+ : core::Processor(name, uuid), logger_(logging::LoggerFactory<T>::getLogger()) {
+ }
+
+ void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override {
+ std::string controllerService;
+ context->getProperty(dbControllerService().getName(), controllerService);
+
+ dbService_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
+ if (!dbService_)
+ throw minifi::Exception(PROCESSOR_EXCEPTION, "'DB Controller Service' must be defined");
+
+ static_cast<T*>(this)->processOnSchedule(*context);
+ }
+
+ void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override {
+ std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ logger_->log_warn("'onTrigger' is called before previous 'onTrigger' call is finished.");
+ context->yield();
+ return;
+ }
+
+ try {
+ if (!connection_) {
+ connection_ = dbService_->getConnection();
+ }
+ static_cast<T*>(this)->processOnTrigger(*session);
+ } catch (std::exception& e) {
+ logger_->log_error("SQLProcessor: '%s'", e.what());
+ if (connection_) {
+ std::string exp;
+ if (!connection_->connected(exp)) {
+ logger_->log_error("SQLProcessor: Connection exception: %s", exp.c_str());
+ connection_.reset();
+ }
+ }
+ context->yield();
+ }
+ }
+
+ void notifyStop() override {
+ connection_.reset();
+ }
+
+ protected:
+ static const core::Property& dbControllerService() {
+ static const core::Property s_dbControllerService =
+ core::PropertyBuilder::createProperty("DB Controller Service")->
+ isRequired(true)->
+ withDescription("Database Controller Service.")->
+ supportsExpressionLanguage(true)->
+ build();
+ return s_dbControllerService;
+ }
+
+ std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<sql::controllers::DatabaseService> dbService_;
+ std::unique_ptr<sql::Connection> connection_;
+ std::mutex onTriggerMutex_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/services/DatabaseService.cpp b/extensions/sql/services/DatabaseService.cpp
new file mode 100644
index 0000000..cb2e8a2
--- /dev/null
+++ b/extensions/sql/services/DatabaseService.cpp
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "DatabaseService.h"
+#include "DatabaseService.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+static core::Property RemoteServer;
+static core::Property Port;
+static core::Property MaxQueueSize;
+
+core::Property DatabaseService::ConnectionString(core::PropertyBuilder::createProperty("Connection String")->withDescription("Database Connection String")->isRequired(true)->build());
+
+void DatabaseService::initialize() {
+ std::lock_guard<std::recursive_mutex> lock(initialization_mutex_);
+
+ if (initialized_)
+ return;
+
+ ControllerService::initialize();
+
+ initializeProperties();
+
+ initialized_ = true;
+}
+
+void DatabaseService::onEnable() {
+ getProperty(ConnectionString.getName(), connection_string_);
+}
+
+void DatabaseService::initializeProperties() {
+ setSupportedProperties( { ConnectionString });
+}
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/services/DatabaseService.h b/extensions/sql/services/DatabaseService.h
new file mode 100644
index 0000000..fb9020f
--- /dev/null
+++ b/extensions/sql/services/DatabaseService.h
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide Database services
+ */
+class DatabaseService : public core::controller::ControllerService {
+ public:
+
+ /**
+ * Constructors for the controller service.
+ */
+ explicit DatabaseService(const std::string &name, const std::string &id)
+ : ControllerService(name, id),
+ initialized_(false),
+ logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+ initialize();
+ }
+
+ explicit DatabaseService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+ : ControllerService(name, uuid),
+ initialized_(false),
+ logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+ initialize();
+ }
+
+ explicit DatabaseService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+ : ControllerService(name),
+ initialized_(false),
+ logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+ setConfiguration(configuration);
+ initialize();
+ }
+
+ /**
+ * Parameters needed.
+ */
+ static core::Property ConnectionString;
+
+ virtual void initialize() override;
+
+ void yield() override {
+
+ }
+
+ bool isRunning() override {
+ return getState() == core::controller::ControllerServiceState::ENABLED;
+ }
+
+ bool isWorkAvailable() override {
+ return false;
+ }
+
+ virtual void onEnable() override;
+
+ virtual std::unique_ptr<sql::Connection> getConnection() const = 0;
+
+ protected:
+
+ void initializeProperties();
+
+ // initialization mutex.
+ std::recursive_mutex initialization_mutex_;
+
+ bool initialized_{};
+
+ std::string connection_string_;
+
+ private:
+
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace controllers */
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_ */
diff --git a/extensions/sql/services/ODBCConnector.cpp b/extensions/sql/services/ODBCConnector.cpp
new file mode 100644
index 0000000..d785bd7
--- /dev/null
+++ b/extensions/sql/services/ODBCConnector.cpp
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "ODBCConnector.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+std::unique_ptr<sql::Connection> ODBCService::getConnection() const {
+ return std::unique_ptr<sql::Connection>(new ODBCConnection(connection_string_));
+}
+
+} /* namespace controllers */
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/services/ODBCConnector.h b/extensions/sql/services/ODBCConnector.h
new file mode 100644
index 0000000..4202d05
--- /dev/null
+++ b/extensions/sql/services/ODBCConnector.h
@@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "DatabaseService.h"
+#include "core/Resource.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+#include <soci/odbc/soci-odbc.h>
+
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+class ODBCConnection : public sql::Connection {
+ public:
+ explicit ODBCConnection(const std::string& connectionString)
+ : connection_string_(connectionString) {
+ session_ = std::make_unique<soci::session>(getSessionParameters());
+ }
+
+ virtual ~ODBCConnection() {
+ }
+
+ bool connected(std::string& exception) const override {
+ try {
+ exception.clear();
+ // According to https://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most by Rob Hruska,
+ // 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Oracle 'SELECT 1 FROM DUAL' works.
+ prepareStatement("select 1")->execute();
+ return true;
+ } catch (std::exception& e) {
+ exception = e.what();
+ return false;
+ }
+ }
+
+ std::unique_ptr<sql::Statement> prepareStatement(const std::string& query) const override {
+ return std::make_unique<sql::Statement>(*session_, query);
+ }
+
+ std::unique_ptr<Session> getSession() const override {
+ return std::make_unique<sql::Session>(*session_);
+ }
+
+ private:
+ soci::connection_parameters getSessionParameters() const {
+ static const soci::backend_factory &backEnd = *soci::factory_odbc();
+
+ soci::connection_parameters parameters(backEnd, connection_string_);
+ parameters.set_option(soci::odbc_option_driver_complete, "0" /* SQL_DRIVER_NOPROMPT */);
+
+ return parameters;
+ }
+
+ private:
+ std::unique_ptr<soci::session> session_;
+ std::string connection_string_;
+};
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide Database services
+ */
+class ODBCService : public DatabaseService {
+ public:
+ explicit ODBCService(const std::string &name, const std::string &id)
+ : DatabaseService(name, id),
+ logger_(logging::LoggerFactory<ODBCService>::getLogger()) {
+ initialize();
+ }
+
+ explicit ODBCService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+ : DatabaseService(name, uuid),
+ logger_(logging::LoggerFactory<ODBCService>::getLogger()) {
+ initialize();
+ }
+
+ explicit ODBCService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+ : DatabaseService(name),
+ logger_(logging::LoggerFactory<ODBCService>::getLogger()) {
+ setConfiguration(configuration);
+ initialize();
+ }
+
+ virtual std::unique_ptr<sql::Connection> getConnection() const override;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(ODBCService, "Controller service that provides ODBC database connection");
+
+} /* namespace controllers */
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/fedora.sh b/fedora.sh
index db28954..ca57258 100644
--- a/fedora.sh
+++ b/fedora.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-verify_enable(){
+verify_enable_platform(){
feature="$1"
feature_status=${!1}
verify_gcc_enable $feature
diff --git a/rheldistro.sh b/rheldistro.sh
index 1c71e92..5999b41 100644
--- a/rheldistro.sh
+++ b/rheldistro.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-verify_enable() {
+verify_enable_platform() {
feature="$1"
feature_status=${!1}
if [ "$OS_MAJOR" = "6" ]; then
diff --git a/suse.sh b/suse.sh
index 7aff26f..f72474c 100644
--- a/suse.sh
+++ b/suse.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-verify_enable() {
+verify_enable_platform() {
feature="$1"
feature_status=${!1}
if [ "$OS_MAJOR" = "6" ]; then
diff --git a/win_build_vs.bat b/win_build_vs.bat
index 0749ffc..c7df9d4 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -26,6 +26,7 @@
set build_kafka=off
set build_coap=off
set build_jni=off
+set build_SQL=off
set generator="Visual Studio 15 2017"
set cpack=OFF
@@ -45,6 +46,9 @@
if [%%~x] EQU [/J] (
set build_JNI=ON
)
+ if [%%~x] EQU [/S] (
+ set build_SQL=ON
+ )
rem if [%%~x] EQU [/C] (
rem set build_coap=ON
rem )
@@ -64,7 +68,7 @@
-cmake -G %generator% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DENABLE_WEL=TRUE -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% .. && msbuild /m nifi-minifi-cpp.sln /property:Configuration=%cmake_build_type% /property:Platform=%build_platform% && copy main\%cmake_build_type%\minifi.exe main\
+cmake -G %generator% -DENABLE_SQL=%build_SQL% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DENABLE_WEL=TRUE -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% .. && msbuild /m nifi-minifi-cpp.sln /property:Configuration=%cmake_build_type% /property:Platform=%build_platform% && copy main\%cmake_build_type%\minifi.exe main\
if [%cpack%] EQU [ON] (
cpack
)