Merge branch 'minificpp-1013-rebase'

Signed-off-by: Daniel Bakai <bakaid@apache.org>

This closes #732
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 49703b0..c3850f8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -328,6 +328,11 @@
 	endif()
 endif(WIN32)
 
+option(ENABLE_SQL "Enables the SQL Suite of Tools." OFF)
+if (ENABLE_ALL OR ENABLE_SQL)
+	createExtension(SQL-EXTENSIONS "SQL EXTENSIONS" "Enables the SQL Suite of Tools" "extensions/sql")
+endif()
+
 ## Create MQTT Extension
 option(ENABLE_MQTT "Enables the mqtt extension." OFF)
 if(ENABLE_ALL OR ENABLE_MQTT)
@@ -374,12 +379,16 @@
 
 ## SQLite extensions
 option(ENABLE_SQLITE "Disables the scripting extensions." OFF)
-if (ENABLE_ALL OR ENABLE_SQLITE)
+if (ENABLE_SQLITE)
 	include(BundledSQLite)
 	use_bundled_sqlite(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
     createExtension(SQLITE-EXTENSIONS "SQLITE EXTENSIONS" "This enables sqlite" "extensions/sqlite" "${TEST_DIR}/sqlite-tests")
 endif()
 
+if (ENABLE_SQL AND ENABLE_SQLITE)
+	message(FATAL_ERROR "ENABLE_SQL and ENABLE_SQLITE are incompatible. Set only one at a time.")
+endif()
+
 ## USB camera extensions
 option(ENABLE_USB_CAMERA "Enables USB camera support." OFF)
 if (ENABLE_ALL OR ENABLE_USB_CAMERA)
diff --git a/CMakeSettings.json b/CMakeSettings.json
index 89ce4d7..96f2694 100644
--- a/CMakeSettings.json
+++ b/CMakeSettings.json
@@ -60,6 +60,10 @@
           "value": "TRUE"
         },
         {
+          "name": "ENABLE_SQL",
+          "value": "TRUE"
+        },
+        {
           "name": "ENABLE_LIBRDKAFKA",
           "value": "OFF"
         },
diff --git a/aptitude.sh b/aptitude.sh
index 05913d5..289dbfe 100644
--- a/aptitude.sh
+++ b/aptitude.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-verify_enable(){
+verify_enable_platform(){
     feature="$1"
     feature_status=${!1}
     verify_gcc_enable $feature
diff --git a/bootstrap.sh b/bootstrap.sh
index ac60b9b..b7cd7c4 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -42,6 +42,7 @@
 CMAKE_OPTIONS_ENABLED=()
 CMAKE_OPTIONS_DISABLED=()
 CMAKE_MIN_VERSION=()
+INCOMPATIBLE_WITH=()
 DEPLOY_LIMITS=()
 USER_DISABLE_TESTS="${FALSE}"
 USE_NINJA="false"
@@ -301,6 +302,9 @@
 
 add_disabled_option SQLITE_ENABLED ${FALSE} "ENABLE_SQLITE"
 
+add_disabled_option SQL_ENABLED ${FALSE} "ENABLE_SQL"
+set_incompatible_with SQL_ENABLED SQLITE_ENABLED
+
 # Since the following extensions have limitations on
 add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE}
 add_dependency BUSTACHE_ENABLED "boost"
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 93b85f5..d292d85 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -69,6 +69,11 @@
 	done
 }
 
+set_incompatible_with(){
+  INCOMPATIBLE_WITH+=("$1:$2")
+  INCOMPATIBLE_WITH+=("$2:$1")
+}
+
 print_multi_option_status(){
   feature="$1"
   feature_status=${!1}
@@ -169,6 +174,30 @@
   done
 }
 
+check_compatibility(){
+  for option in "${INCOMPATIBLE_WITH[@]}" ; do
+    OPT=${option%%:*}
+    if [ "$OPT" = "$1" ]; then
+      OTHER_FEATURE=${option#*:}
+      OTHER_FEATURE_VALUE=${!OTHER_FEATURE}
+      if [ $OTHER_FEATURE_VALUE = "Enabled" ]; then
+        echo "false"
+        return
+      fi
+    fi
+  done
+  echo "true"
+}
+
+verify_enable(){
+  COMPATIBLE=$(check_compatibility $1)
+  if [ "$COMPATIBLE" = "true" ]; then
+    verify_enable_platform $1
+  else
+    echo "false"
+  fi
+}
+
 can_deploy(){
   for option in "${DEPLOY_LIMITS[@]}" ; do
     OPT=${option%%:*}
@@ -334,6 +363,7 @@
   echo "V. AWS Support .................$(print_feature_status AWS_ENABLED)"
   echo "T. OpenCV Support ..............$(print_feature_status OPENCV_ENABLED)"
   echo "U. OPC-UA Support...............$(print_feature_status OPC_ENABLED)"
+  echo "W. SQL Support..................$(print_feature_status SQL_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -348,12 +378,13 @@
   fi
   echo "Q. Quit"
   echo "* Extension cannot be installed due to"
-  echo -e "  version of cmake or other software\r\n"
+  echo "  version of cmake or other software, or"
+  echo -e "  incompatibility with other extensions\r\n"
 }
 
 read_feature_options(){
   local choice
-  read -p "Enter choice [ A - V or 1-4 ] " choice
+  read -p "Enter choice [ A - W or 1-4 ] " choice
   choice=$(echo ${choice} | tr '[:upper:]' '[:lower:]')
   case $choice in
     a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -380,6 +411,7 @@
 	s) ToggleFeature SFTP_ENABLED ;;
     t) ToggleFeature OPENCV_ENABLED ;;
     u) ToggleFeature OPC_ENABLED ;;
+    w) ToggleFeature SQL_ENABLED ;;	
     1) ToggleFeature TESTS_DISABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
diff --git a/centos.sh b/centos.sh
index c3b9d77..1430164 100644
--- a/centos.sh
+++ b/centos.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-verify_enable() {
+verify_enable_platform() {
     feature="$1"
     feature_status=${!1}
     if [ "$OS_MAJOR" = "6" ]; then
diff --git a/darwin.sh b/darwin.sh
index da5df01..0e6bd81 100644
--- a/darwin.sh
+++ b/darwin.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-verify_enable() {
+verify_enable_platform() {
     feature="$1"
     feature_status=${!1}
     if [ "$feature" = "BUSTACHE_ENABLED" ]; then
diff --git a/debian.sh b/debian.sh
index bb68ef0..9b67d7d 100644
--- a/debian.sh
+++ b/debian.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-verify_enable(){
+verify_enable_platform(){
   feature="$1"
   feature_status=${!1}
   verify_gcc_enable $feature
diff --git a/extensions/sql/CMakeLists.txt b/extensions/sql/CMakeLists.txt
new file mode 100644
index 0000000..ead2726
--- /dev/null
+++ b/extensions/sql/CMakeLists.txt
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+set(CMAKE_CXX_STANDARD 14)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+include_directories(../../thirdparty/rapidjson-1.1.0/include/ ../../thirdparty/rapidjson-1.1.0/include/rapidjson)
+include_directories(".")
+
+file(GLOB SOURCES  "*.cpp" "services/*.cpp" "processors/*.cpp"  "data/*.cpp")
+
+add_library(minifi-sql STATIC ${SOURCES})
+set_property(TARGET minifi-sql PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+if(THREADS_HAVE_PTHREAD_ARG)
+  target_compile_options(PUBLIC minifi-sql "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+  target_link_libraries(minifi-sql "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+# Get whether we should use lib64/ library paths
+get_property(LIB64 GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS)
+
+if(WIN32)
+	find_package(ODBC REQUIRED)
+else()
+	# Build iODBC
+
+	# Define byproducts
+	if(LIB64 AND NOT APPLE)
+		set(IODBC_BYPRODUCT "lib64/libiodbc.a")
+	else()
+		set(IODBC_BYPRODUCT "lib/libiodbc.a")
+	endif()
+
+	set(IODBC_BYPRODUCT_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/iodbc-install/")
+
+	# Build project
+	ExternalProject_Add(
+			iodbc-external
+			URL "https://github.com/openlink/iODBC/archive/v3.52.13.tar.gz"
+			URL_HASH "SHA256=4bf67fc6d4d237a4db19b292b5dd255ee09a0b2daa4e4058cf3a918bc5102135"
+			BUILD_IN_SOURCE true
+			SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/iodbc-src"
+			BUILD_COMMAND make
+			CMAKE_COMMAND ""
+			UPDATE_COMMAND ""
+			INSTALL_COMMAND make install
+			CONFIGURE_COMMAND ""
+			PATCH_COMMAND ./autogen.sh && ./configure --prefix=${IODBC_BYPRODUCT_DIR}
+			STEP_TARGETS build
+			BUILD_BYPRODUCTS "${IODBC_BYPRODUCT_DIR}/${IODBC_BYPRODUCT}"
+			EXCLUDE_FROM_ALL TRUE
+	)
+
+	# Set variables
+	set(IODBC_FOUND "YES" CACHE STRING "" FORCE)
+	set(IODBC_INCLUDE_DIRS "${IODBC_BYPRODUCT_DIR}/include" CACHE STRING "" FORCE)
+	set(IODBC_LIBRARIES "${IODBC_BYPRODUCT_DIR}/${IODBC_BYPRODUCT}" CACHE STRING "" FORCE)
+
+	# Set exported variables for FindPackage.cmake
+	set(EXPORTED_IODBC_INCLUDE_DIRS "${IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+	set(EXPORTED_IODBC_LIBRARIES "${IODBC_LIBRARIES}" CACHE STRING "" FORCE)
+
+	# Create imported targets
+	add_library(ODBC::ODBC STATIC IMPORTED)
+	set_target_properties(ODBC::ODBC PROPERTIES IMPORTED_LOCATION "${IODBC_LIBRARIES}")
+	add_dependencies(ODBC::ODBC iodbc-external)
+	file(MAKE_DIRECTORY ${IODBC_INCLUDE_DIRS})
+	set_property(TARGET ODBC::ODBC APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${IODBC_INCLUDE_DIRS})
+endif()
+
+# Build SOCI
+
+# Find patch executable
+find_package(Patch)
+
+# Define patch step
+set(PC "${Patch_EXECUTABLE}" -p1 -i "${CMAKE_CURRENT_SOURCE_DIR}/patch/soci.patch")
+
+# Define byproducts
+if(NOT APPLE AND CMAKE_SIZEOF_VOID_P EQUAL 8)
+	set(LIBSUFFIX 64)
+endif()
+
+if (WIN32)
+	set(BYPRODUCT_SUFFIX "_4_0.lib")
+else()
+	set(BYPRODUCT_SUFFIX ".a")
+endif()
+
+set(SOCI_BYPRODUCTS
+		"lib${LIBSUFFIX}/libsoci_core${BYPRODUCT_SUFFIX}"
+		"lib${LIBSUFFIX}/libsoci_odbc${BYPRODUCT_SUFFIX}"
+		)
+
+set(SOCI_BYPRODUCT_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/soci-install")
+
+foreach(SOCI_BYPRODUCT ${SOCI_BYPRODUCTS})
+	list(APPEND SOCI_LIBRARIES_LIST "${SOCI_BYPRODUCT_DIR}/${SOCI_BYPRODUCT}")
+endforeach(SOCI_BYPRODUCT)
+
+# Set build options
+set(SOCI_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+		"-DCMAKE_INSTALL_PREFIX=${SOCI_BYPRODUCT_DIR}"
+		"-DSOCI_TESTS=OFF"
+		"-DSOCI_SHARED=OFF"
+		"-DSOCI_CXX_C11=ON"
+		"-DWITH_ODBC=ON"
+		"-DSOCI_ODBC=ON"
+		"-DWITH_BOOST=OFF")
+
+if(NOT WIN32)
+	list(APPEND SOCI_CMAKE_ARGS "-DCMAKE_MODULE_PATH=${CMAKE_CURRENT_SOURCE_DIR}/cmake/"
+			"-DEXPORTED_IODBC_INCLUDE_DIRS=${EXPORTED_IODBC_INCLUDE_DIRS}"
+			"-DEXPORTED_IODBC_LIBRARIES=${EXPORTED_IODBC_LIBRARIES}")
+endif()
+
+# Build project
+ExternalProject_Add(
+		soci-external
+		URL "https://github.com/SOCI/soci/archive/4.0.0.tar.gz"
+		URL_HASH "SHA256=359b988d8cbe81357835317821919f7e270c0705e41951a92ac1627cb9fe8faf"
+		PATCH_COMMAND ${PC}
+		SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/soci-src"
+		CMAKE_ARGS ${SOCI_CMAKE_ARGS}
+		BUILD_BYPRODUCTS ${SOCI_LIBRARIES_LIST}
+		EXCLUDE_FROM_ALL TRUE
+)
+
+# Set dependencies
+if(NOT WIN32)
+	add_dependencies(soci-external ODBC::ODBC)
+endif()
+
+# Set variables
+set(SOCI_FOUND "YES" CACHE STRING "" FORCE)
+set(SOCI_INCLUDE_DIR "${SOCI_BYPRODUCT_DIR}/include" CACHE STRING "" FORCE)
+set(SOCI_LIBRARIES "${SOCI_LIBRARIES_LIST}" CACHE STRING "" FORCE)
+
+# Create imported targets
+file(MAKE_DIRECTORY ${SOCI_INCLUDE_DIR})
+
+add_library(SOCI::libsoci_core STATIC IMPORTED)
+set_target_properties(SOCI::libsoci_core PROPERTIES IMPORTED_LOCATION "${SOCI_BYPRODUCT_DIR}/lib${LIBSUFFIX}/libsoci_core${BYPRODUCT_SUFFIX}")
+set_target_properties(SOCI::libsoci_core PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${SOCI_INCLUDE_DIR}")
+add_dependencies(SOCI::libsoci_core soci-external)
+
+add_library(SOCI::libsoci_odbc STATIC IMPORTED)
+set_target_properties(SOCI::libsoci_odbc PROPERTIES IMPORTED_LOCATION "${SOCI_BYPRODUCT_DIR}/lib${LIBSUFFIX}/libsoci_odbc${BYPRODUCT_SUFFIX}")
+set_target_properties(SOCI::libsoci_odbc PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${SOCI_INCLUDE_DIR}")
+add_dependencies(SOCI::libsoci_odbc soci-external)
+set_property(TARGET SOCI::libsoci_odbc APPEND PROPERTY INTERFACE_LINK_LIBRARIES SOCI::libsoci_core)
+set_property(TARGET SOCI::libsoci_odbc APPEND PROPERTY INTERFACE_LINK_LIBRARIES ODBC::ODBC)
+
+target_link_libraries(minifi-sql SOCI::libsoci_odbc SOCI::libsoci_core)
+
+target_link_libraries(minifi-sql ${LIBMINIFI})
+
+SET (SQL-EXTENSION minifi-sql PARENT_SCOPE)
+register_extension(minifi-sql)
diff --git a/extensions/sql/SQLLoader.cpp b/extensions/sql/SQLLoader.cpp
new file mode 100644
index 0000000..2a787c2
--- /dev/null
+++ b/extensions/sql/SQLLoader.cpp
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/FlowConfiguration.h"
+#include "SQLLoader.h"
+
+static auto added = core::FlowConfiguration::add_static_func("createSQLFactory");
+
+extern "C" {
+
+void *createSQLFactory(void) {
+  return new SQLFactory();
+}
+
+}
diff --git a/extensions/sql/SQLLoader.h b/extensions/sql/SQLLoader.h
new file mode 100644
index 0000000..992c802
--- /dev/null
+++ b/extensions/sql/SQLLoader.h
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "core/ClassLoader.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/PutSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "services/ODBCConnector.h"
+
+class SQLFactory : public core::ObjectFactory {
+ public:
+  SQLFactory() {}
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  std::string getName() override {
+    return "SQLFactory";
+  }
+
+  std::string getClassName() override{
+    return "SQLFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  std::vector<std::string> getClassNames() override{
+    return {"ExecuteSQL", "PutSQL", "QueryDatabaseTable", "ODBCService"};
+  }
+
+  template <typename T>
+  static std::unique_ptr<ObjectFactory> getObjectFactory() {
+    return std::make_unique<core::DefautObjectFactory<T>>();
+  }
+
+  std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override {
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "ExecuteSQL")) {
+      return getObjectFactory<minifi::processors::ExecuteSQL>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "PutSQL")) {
+      return getObjectFactory<minifi::processors::PutSQL>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "QueryDatabaseTable")) {
+      return getObjectFactory<minifi::processors::QueryDatabaseTable>();
+    }
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "ODBCService")) {
+      return getObjectFactory<minifi::sql::controllers::ODBCService>();
+    }
+
+    return nullptr;
+  }
+};
+
+extern "C" {
+  DLL_EXPORT void *createSQLFactory(void);
+}
diff --git a/extensions/sql/cmake/FindODBC.cmake b/extensions/sql/cmake/FindODBC.cmake
new file mode 100644
index 0000000..b232e76
--- /dev/null
+++ b/extensions/sql/cmake/FindODBC.cmake
@@ -0,0 +1,34 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+if (NOT ODBC_FOUND)
+    set(ODBC_FOUND "YES" CACHE STRING "" FORCE)
+    set(ODBC_INCLUDE_DIR "${EXPORTED_IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+    set(ODBC_INCLUDE_DIRS "${EXPORTED_IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+    set(ODBC_LIBRARIES "${EXPORTED_IODBC_LIBRARIES}" CACHE STRING "" FORCE)
+    set(ODBC_LIBRARY "${EXPORTED_IODBC_LIBRARIES}"  CACHE STRING "" FORCE)
+endif()
+
+if(NOT TARGET ODBC::ODBC)
+    add_library(ODBC::ODBC STATIC IMPORTED)
+    set_target_properties(ODBC::ODBC PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${ODBC_INCLUDE_DIRS}")
+    set_target_properties(ODBC::ODBC PROPERTIES
+        IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+        IMPORTED_LOCATION "${ODBC_LIBRARIES}")
+endif()
\ No newline at end of file
diff --git a/extensions/sql/data/DatabaseConnectors.h b/extensions/sql/data/DatabaseConnectors.h
new file mode 100644
index 0000000..0e94b73
--- /dev/null
+++ b/extensions/sql/data/DatabaseConnectors.h
@@ -0,0 +1,102 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include <soci/soci.h>
+
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+/**
+ * We do not intend to create an abstract facade here. We know that SOCI is the underlying
+ * SQL library. We only wish to abstract ODBC specific information
+ */
+
+class Statement {
+ public:
+
+  explicit Statement(soci::session& session, const std::string &query)
+    : session_(session), query_(query) {
+  }
+
+  virtual ~Statement() {
+  }
+
+  soci::rowset<soci::row> execute() {
+    return session_.prepare << query_;
+  }
+
+ protected:
+  std::string query_;
+  soci::session& session_;
+};
+
+class Session {
+ public:
+
+  explicit Session(soci::session& session)
+    : session_(session) {
+  }
+
+  virtual ~Session() {
+  }
+
+  void begin() {
+    session_.begin();
+  }
+
+  void commit() {
+    session_.commit();
+  }
+
+  void rollback() {
+    session_.rollback();
+  }
+
+  void execute(const std::string &statement) {
+    session_ << statement;
+  }
+
+protected:
+  soci::session& session_;
+};
+
+class Connection {
+ public:
+  virtual ~Connection() {
+  }
+  virtual bool connected(std::string& exception) const = 0;
+  virtual std::unique_ptr<Statement> prepareStatement(const std::string &query) const = 0;
+  virtual std::unique_ptr<Session> getSession() const = 0;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/JSONSQLWriter.cpp b/extensions/sql/data/JSONSQLWriter.cpp
new file mode 100644
index 0000000..514fd4d
--- /dev/null
+++ b/extensions/sql/data/JSONSQLWriter.cpp
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "JSONSQLWriter.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/prettywriter.h"
+#include "Exception.h"
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+JSONSQLWriter::JSONSQLWriter(bool pretty)
+  : pretty_(pretty), jsonPayload_(rapidjson::kArrayType) {
+}
+
+JSONSQLWriter::~JSONSQLWriter() {}
+
+void JSONSQLWriter::beginProcessRow() {
+  jsonRow_ = rapidjson::kObjectType;
+}
+
+void JSONSQLWriter::endProcessRow() {
+  jsonPayload_.PushBack(jsonRow_, jsonPayload_.GetAllocator());
+}
+
+void JSONSQLWriter::processColumnName(const std::string& name) {}
+
+void JSONSQLWriter::processColumn(const std::string& name, const std::string& value) {
+  addToJSONRow(name, std::move(toJSONString(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, double value) {
+  addToJSONRow(name, std::move(rapidjson::Value().SetDouble(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, int value) {
+  addToJSONRow(name, std::move(rapidjson::Value().SetInt(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, long long value) {
+  addToJSONRow(name, std::move(rapidjson::Value().SetInt64(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, unsigned long long value) {
+  addToJSONRow(name, std::move(rapidjson::Value().SetUint64(value)));
+}
+
+void JSONSQLWriter::processColumn(const std::string& name, const char* value) {
+  addToJSONRow(name, std::move(toJSONString(value)));
+}
+
+void JSONSQLWriter::addToJSONRow(const std::string& columnName, rapidjson::Value&& jsonValue) {
+  jsonRow_.AddMember(toJSONString(columnName), std::move(jsonValue), jsonPayload_.GetAllocator());
+}
+
+rapidjson::Value JSONSQLWriter::toJSONString(const std::string& s) {
+  rapidjson::Value jsonValue;
+  jsonValue.SetString(s.c_str(), s.size(), jsonPayload_.GetAllocator());
+
+  return jsonValue;
+}
+
+std::string JSONSQLWriter::toString() {
+  rapidjson::StringBuffer buffer;
+
+  if (pretty_) {
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+    jsonPayload_.Accept(writer);
+  } else {
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    jsonPayload_.Accept(writer);
+  }
+
+  std::stringstream outputStream;
+  outputStream << buffer.GetString();
+
+  jsonPayload_ = rapidjson::Document(rapidjson::kArrayType);
+
+  return outputStream.str();
+}
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/JSONSQLWriter.h b/extensions/sql/data/JSONSQLWriter.h
new file mode 100644
index 0000000..6888527
--- /dev/null
+++ b/extensions/sql/data/JSONSQLWriter.h
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "rapidjson/document.h"
+
+#include "SQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class JSONSQLWriter: public SQLWriter {
+ public:
+  explicit JSONSQLWriter(bool pretty);
+  virtual ~JSONSQLWriter();
+
+  std::string toString() override;
+
+private:
+  void beginProcessRow() override;
+  void endProcessRow() override;
+  void processColumnName(const std::string& name) override;
+  void processColumn(const std::string& name, const std::string& value) override;
+  void processColumn(const std::string& name, double value) override;
+  void processColumn(const std::string& name, int value) override;
+  void processColumn(const std::string& name, long long value) override;
+  void processColumn(const std::string& name, unsigned long long value) override;
+  void processColumn(const std::string& name, const char* value) override;
+
+  void addToJSONRow(const std::string& columnName, rapidjson::Value&& jsonValue);
+
+  rapidjson::Value toJSONString(const std::string& s);
+
+ private:
+  bool pretty_;
+  rapidjson::Document jsonPayload_;
+  rapidjson::Value jsonRow_;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
diff --git a/extensions/sql/data/MaxCollector.h b/extensions/sql/data/MaxCollector.h
new file mode 100644
index 0000000..359f962
--- /dev/null
+++ b/extensions/sql/data/MaxCollector.h
@@ -0,0 +1,155 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <tuple>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class MaxCollector: public SQLRowSubscriber {
+  void beginProcessRow() override {}
+
+  void endProcessRow() override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (countColumns_ != mapState_.size())
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
+
+    columnsVerified_ = true;
+  }
+
+  void processColumnName(const std::string& name) override {
+    if (columnsVerified_) {
+      return;
+    }
+
+    if (mapState_.count(name)) {
+      countColumns_++;
+    }
+  }
+
+  void processColumn(const std::string& name, const std::string& value)  override {
+    updateMaxValue(name, '\'' + value + '\'');
+  }
+
+  void processColumn(const std::string& name, double value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, int value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, unsigned long long value) override {
+    updateMaxValue(name, value);
+  }
+
+  void processColumn(const std::string& name, const char* value) override {}
+
+  template <typename T>
+  struct MaxValue {
+    void updateMaxValue(const std::string& name, const T& value) {
+      const auto it = mapColumnNameValue_.find(name);
+      if (it == mapColumnNameValue_.end()) {
+        mapColumnNameValue_.insert({ name, value });
+      } else {
+        if (value > it->second) {
+          it->second = value;
+        }
+      }
+    }
+
+    std::unordered_map<std::string, T> mapColumnNameValue_;
+  };
+
+  template <typename Tuple, int Index>
+  struct UpdateMapState {
+    UpdateMapState(const Tuple& tpl, std::unordered_map<std::string, std::string>& mapState) {
+      for (auto& el : mapState) {
+        const auto& maxVal = std::get<Index>(tpl);
+
+        const auto it = maxVal.mapColumnNameValue_.find(el.first);
+        if (it != maxVal.mapColumnNameValue_.end()) {
+          std::stringstream ss;
+          ss << it->second;
+          el.second = ss.str();
+        }
+      }
+
+      UpdateMapState<Tuple, Index - 1>(tpl, mapState);
+    }
+  };
+
+  template <typename Tuple>
+  struct UpdateMapState<Tuple, -1> {
+    UpdateMapState(const Tuple&, std::unordered_map<std::string, std::string>&) {}
+  };
+
+  template <typename ...Ts>
+  struct MaxValues : public std::tuple<MaxValue<Ts>...> {
+    constexpr static size_t size = sizeof...(Ts);
+  };
+
+ public:
+  MaxCollector(const std::string& selectQuery, const std::string& maxValueColumnNames, std::unordered_map<std::string, std::string>& mapState)
+    :selectQuery_(selectQuery), maxValueColumnNames_(maxValueColumnNames), mapState_(mapState) {
+  }
+
+  template <typename T>
+  void updateMaxValue(const std::string& columnName, const T& value) {
+    if (mapState_.count(columnName)) {
+      std::get<MaxValue<T>>(maxValues_).updateMaxValue(columnName, value);
+    }
+  }
+
+  bool updateMapState() {
+    auto mapState = mapState_;
+    UpdateMapState<decltype(maxValues_), decltype(maxValues_)::size - 1>(maxValues_, mapState_);
+
+    return mapState != mapState_;
+  }
+
+ private:
+  const std::string selectQuery_;
+  const std::string maxValueColumnNames_;
+  std::unordered_map<std::string, std::string>& mapState_;
+  MaxValues<std::string, double, int, long long, unsigned long long> maxValues_;
+  size_t countColumns_{};
+  bool columnsVerified_{false};
+};
+	
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/SQLRowSubscriber.h b/extensions/sql/data/SQLRowSubscriber.h
new file mode 100644
index 0000000..5325e71
--- /dev/null
+++ b/extensions/sql/data/SQLRowSubscriber.h
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+struct SQLRowSubscriber {
+  virtual ~SQLRowSubscriber() {}
+  virtual void beginProcessRow() = 0;
+  virtual void endProcessRow() = 0;
+  virtual void processColumnName(const std::string& name) = 0;
+  virtual void processColumn(const std::string& name, const std::string& value) = 0;
+  virtual void processColumn(const std::string& name, double value) = 0;
+  virtual void processColumn(const std::string& name, int value) = 0;
+  virtual void processColumn(const std::string& name, long long value) = 0;
+  virtual void processColumn(const std::string& name, unsigned long long value) = 0;
+  // Process NULL value.
+  virtual void processColumn(const std::string& name, const char* value) = 0;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/SQLRowsetProcessor.cpp b/extensions/sql/data/SQLRowsetProcessor.cpp
new file mode 100644
index 0000000..1339b9d
--- /dev/null
+++ b/extensions/sql/data/SQLRowsetProcessor.cpp
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SQLRowsetProcessor.h"
+
+#include "Exception.h"
+#include "Utils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers)
+  : rowset_(rowset), rowSubscribers_(rowSubscribers) {
+  iter_ = rowset_.begin();
+}
+
+size_t SQLRowsetProcessor::process(size_t max) {
+  size_t count = 0;
+
+  for (; iter_ != rowset_.end(); ) {
+    addRow(*iter_, count);
+    iter_++;
+    count++;
+    totalCount_++;
+    if (max > 0 && count >= max) {
+      break;
+    }
+  }
+
+  return count;
+}
+
+void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
+  for (const auto& pRowSubscriber : rowSubscribers_) {
+    pRowSubscriber->beginProcessRow();
+  }
+
+  if (rowCount == 0) {
+    for (std::size_t i = 0; i != row.size(); ++i) {
+      for (const auto& pRowSubscriber : rowSubscribers_) {
+        pRowSubscriber->processColumnName(utils::toLower(row.get_properties(i).get_name()));
+      }
+    }
+  }
+
+  for (std::size_t i = 0; i != row.size(); ++i) {
+    const soci::column_properties& props = row.get_properties(i);
+
+    const auto& name = utils::toLower(props.get_name());
+
+    if (row.get_indicator(i) == soci::i_null) {
+      processColumn(name, "NULL");
+    } else {
+      switch (const auto dataType = props.get_data_type()) {
+        case soci::data_type::dt_string: {
+          processColumn(name, row.get<std::string>(i));
+        }
+        break;
+        case soci::data_type::dt_double: {
+          processColumn(name, row.get<double>(i));
+        }
+        break;
+        case soci::data_type::dt_integer: {
+          processColumn(name, row.get<int>(i));
+        }
+        break;
+        case soci::data_type::dt_long_long: {
+          processColumn(name, row.get<long long>(i));
+        }
+        break;
+        case soci::data_type::dt_unsigned_long_long: {
+          processColumn(name, row.get<unsigned long long>(i));
+        }
+        break;
+        case soci::data_type::dt_date: {
+          const std::tm when = row.get<std::tm>(i);
+
+          char value[128];
+          if (!std::strftime(value, sizeof(value), "%Y-%m-%d %H:%M:%S", &when))
+            throw minifi::Exception(PROCESSOR_EXCEPTION, "SQLRowsetProcessor: !strftime.");
+
+          processColumn(name, std::string(value));
+        }
+        break;
+        default: {
+          throw minifi::Exception(PROCESSOR_EXCEPTION, "SQLRowsetProcessor: Unsupported data type " + std::to_string(dataType));
+        }
+      }
+    }
+  }
+
+  for (const auto& pRowSubscriber : rowSubscribers_) {
+    pRowSubscriber->endProcessRow();
+  }
+}
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/SQLRowsetProcessor.h b/extensions/sql/data/SQLRowsetProcessor.h
new file mode 100644
index 0000000..27545f4
--- /dev/null
+++ b/extensions/sql/data/SQLRowsetProcessor.h
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include <soci/soci.h>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+class SQLRowsetProcessor
+{
+ public:
+  SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers);
+
+  size_t process(size_t max);
+
+ private:
+   void addRow(const soci::row& row, size_t rowCount);
+
+   template <typename T>
+   void processColumn(const std::string& name, const T& value) const {
+     for (const auto& pRowSubscriber: rowSubscribers_) {
+       pRowSubscriber->processColumn(name, value);
+     }
+   }
+
+ private:
+  size_t totalCount_{};
+  soci::rowset<soci::row>::const_iterator iter_;
+  soci::rowset<soci::row> rowset_;
+  std::vector<SQLRowSubscriber*> rowSubscribers_;
+};
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/SQLWriter.h b/extensions/sql/data/SQLWriter.h
new file mode 100644
index 0000000..f8627e9
--- /dev/null
+++ b/extensions/sql/data/SQLWriter.h
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <iostream>
+
+#include <soci/soci.h>
+
+#include "SQLRowSubscriber.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+
+struct SQLWriter: public SQLRowSubscriber
+{
+  virtual std::string toString() = 0;
+};
+
+
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/data/Utils.cpp b/extensions/sql/data/Utils.cpp
new file mode 100644
index 0000000..f1d8422
--- /dev/null
+++ b/extensions/sql/data/Utils.cpp
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Utils.h"
+
+#include <algorithm>
+#include  <cctype>
+#include  <regex>
+#include  <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str) {
+  std::string ret;
+
+  // (int(*)(int))std::tolower - to avoid compilation error 'no matching overloaded function found'. 
+  // It is described in https://stackoverflow.com/questions/5539249/why-cant-transforms-begin-s-end-s-begin-tolower-be-complied-successfu.
+  std::transform(str.begin(), str.end(), std::back_inserter(ret), (int(*)(int))std::tolower);
+
+  return ret;
+}
+
+std::vector<std::string> inputStringToList(const std::string& str) {
+  std::vector<std::string> ret;
+
+  std::string token;
+  // Convert to lower and remove white characters.
+  std::istringstream tokenStream(std::regex_replace(toLower(str), std::regex("\\s"), std::string("")));
+
+  while (std::getline(tokenStream, token, ','))
+  {
+    ret.push_back(token);
+  }
+
+  return ret;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/Utils.h b/extensions/sql/data/Utils.h
new file mode 100644
index 0000000..fb9758f
--- /dev/null
+++ b/extensions/sql/data/Utils.h
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string toLower(const std::string& str);
+std::vector<std::string> inputStringToList(const std::string& str);
+
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/WriteCallback.h b/extensions/sql/data/WriteCallback.h
new file mode 100644
index 0000000..ab961ad
--- /dev/null
+++ b/extensions/sql/data/WriteCallback.h
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class WriteCallback : public OutputStreamCallback {
+public:
+  explicit WriteCallback(const std::string& data)
+    : data_(data) {
+  }
+
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    if (data_.empty())
+      return 0;
+
+    return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
+  }
+
+ const std::string& data_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/patch/soci.patch b/extensions/sql/patch/soci.patch
new file mode 100644
index 0000000..530571d
--- /dev/null
+++ b/extensions/sql/patch/soci.patch
@@ -0,0 +1,18 @@
+diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
+--- orig/CMakeLists.txt	2019-11-09 20:08:01.000000000 +0100
++++ patched/CMakeLists.txt	2019-12-17 15:22:40.000000000 +0100
+@@ -29,8 +29,7 @@ option(SOCI_ASAN "Enable address sanitiz
+ ###############################################################################
+ 
+ # Path to additional CMake modules
+-set(CMAKE_MODULE_PATH ${SOCI_SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH})
+-set(CMAKE_MODULE_PATH ${SOCI_SOURCE_DIR}/cmake/modules ${CMAKE_MODULE_PATH})
++list(APPEND CMAKE_MODULE_PATH "${SOCI_SOURCE_DIR}/cmake/modules" "${SOCI_SOURCE_DIR}/cmake")
+ 
+ include(SociUtilities)
+ include(SociConfig)
+@@ -204,4 +203,3 @@ endforeach()
+ configure_file("${CONFIG_FILE_IN}" "${CONFIG_FILE_OUT}")
+ 
+ message(STATUS "")
+-
diff --git a/extensions/sql/processors/ExecuteSQL.cpp b/extensions/sql/processors/ExecuteSQL.cpp
new file mode 100644
index 0000000..7738408
--- /dev/null
+++ b/extensions/sql/processors/ExecuteSQL.cpp
@@ -0,0 +1,123 @@
+/**
+ * @file ExecuteSQL.cpp
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
+
+const core::Property ExecuteSQL::s_sqlSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+    "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
+    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
+    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+
+const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
+	core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+		"The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+
+const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultRowCount = "executesql.row.count";
+
+ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid), max_rows_(0) {
+}
+
+ExecuteSQL::~ExecuteSQL() {
+}
+
+void ExecuteSQL::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+  initOutputFormat(context);
+
+  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
+  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+}
+
+void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+
+  auto rowset = statement->execute();
+
+  int count = 0;
+  size_t rowCount = 0;
+  sql::JSONSQLWriter sqlWriter(isJSONPretty());
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+
+  // Process rowset.
+  do {
+    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
+    count++;
+    if (rowCount == 0)
+      break;
+
+    const auto output = sqlWriter.toString();
+    if (!output.empty()) {
+      WriteCallback writer(output);
+      auto newflow = session.create();
+      newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
+      session.write(newflow, &writer);
+      session.transfer(newflow, s_success);
+    }
+  } while (rowCount > 0);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/ExecuteSQL.h b/extensions/sql/processors/ExecuteSQL.h
new file mode 100644
index 0000000..764f038
--- /dev/null
+++ b/extensions/sql/processors/ExecuteSQL.h
@@ -0,0 +1,70 @@
+/**
+ * @file ExecuteSQL.h
+ * ExecuteSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "services/DatabaseService.h"
+#include "SQLProcessor.h"
+#include "OutputFormat.h"
+
+#include <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+//! ExecuteSQL Class
+class ExecuteSQL: public SQLProcessor<ExecuteSQL>, public OutputFormat {
+ public:
+  explicit ExecuteSQL(const std::string& name, utils::Identifier uuid = utils::Identifier());
+  virtual ~ExecuteSQL();
+
+  //! Processor Name
+  static const std::string ProcessorName;
+
+  void processOnSchedule(const core::ProcessContext& context);
+  void processOnTrigger(core::ProcessSession& session);
+  
+  void initialize() override;
+
+  static const core::Property s_sqlSelectQuery;
+  static const core::Property s_maxRowsPerFlowFile;
+
+  static const core::Relationship s_success;
+
+ private:
+  int max_rows_;
+  std::string sqlSelectQuery_;
+};
+
+REGISTER_RESOURCE(ExecuteSQL, "ExecuteSQL to execute SELECT statement via ODBC.");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/OutputFormat.cpp b/extensions/sql/processors/OutputFormat.cpp
new file mode 100644
index 0000000..acd9f69
--- /dev/null
+++ b/extensions/sql/processors/OutputFormat.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OutputFormat.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string s_outputFormatJSON = "JSON";
+const std::string s_outputFormatJSONPretty = "JSON-Pretty";
+
+const core::Property& OutputFormat::outputFormat() {
+  static const core::Property s_outputFormat =
+      core::PropertyBuilder::createProperty("Output Format")->
+          isRequired(true)->
+          withDefaultValue(s_outputFormatJSONPretty)->
+          withAllowableValues<std::string>({ s_outputFormatJSON, s_outputFormatJSONPretty })->
+          withDescription("Set the output format type.")->
+          build();
+
+  return s_outputFormat;
+}
+
+bool OutputFormat::isJSONFormat() const {
+  return outputFormat_ == s_outputFormatJSON || outputFormat_ == s_outputFormatJSONPretty;
+}
+
+bool OutputFormat::isJSONPretty() const {
+  return outputFormat_ == s_outputFormatJSONPretty;
+}
+
+void OutputFormat::initOutputFormat(const core::ProcessContext& context) {
+  context.getProperty(outputFormat().getName(), outputFormat_);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/OutputFormat.h b/extensions/sql/processors/OutputFormat.h
new file mode 100644
index 0000000..e94b538
--- /dev/null
+++ b/extensions/sql/processors/OutputFormat.h
@@ -0,0 +1,52 @@
+/**
+ * @file OutputFormat.h
+ * OutputFormat class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "core/Processor.h"
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class OutputFormat {
+ protected:
+  static const core::Property& outputFormat();
+
+  bool isJSONFormat() const;
+
+  bool isJSONPretty() const;
+
+  void initOutputFormat(const core::ProcessContext& context);
+
+ protected:
+   std::string outputFormat_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
new file mode 100644
index 0000000..0512cc9
--- /dev/null
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -0,0 +1,102 @@
+/**
+ * @file PutSQL.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutSQL.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string PutSQL::ProcessorName("PutSQL");
+
+const core::Property PutSQL::s_sqlStatements(
+  core::PropertyBuilder::createProperty("SQL statements")->isRequired(true)->withDefaultValue("System")->withDescription(
+    "A semicolon-delimited list of SQL statements to execute. The statement can be empty, a constant value, or built from attributes using Expression Language. "
+    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
+    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL statements, to be issued by the processor to the database.")
+    ->supportsExpressionLanguage(true)->build());
+
+const core::Relationship PutSQL::s_success("success", "Database is successfully updated.");
+
+PutSQL::PutSQL(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid) {
+}
+
+PutSQL::~PutSQL() {
+}
+
+void PutSQL::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), s_sqlStatements });
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void PutSQL::processOnSchedule(const core::ProcessContext& context) {
+  std::string sqlStatements;
+  context.getProperty(s_sqlStatements.getName(), sqlStatements);
+  sqlStatements_ = utils::StringUtils::split(sqlStatements, ";");
+}
+
+void PutSQL::processOnTrigger(core::ProcessSession& session) {
+  const auto dbSession = connection_->getSession();
+
+  try {
+    dbSession->begin();
+    for (const auto& statement : sqlStatements_) {
+      dbSession->execute(statement);
+    }
+    dbSession->commit();
+  } catch (std::exception& e) {
+    logger_->log_error("SQL statement error: %s", e.what());
+    dbSession->rollback();
+    throw;
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/PutSQL.h b/extensions/sql/processors/PutSQL.h
new file mode 100644
index 0000000..284008a
--- /dev/null
+++ b/extensions/sql/processors/PutSQL.h
@@ -0,0 +1,67 @@
+/**
+ * @file PutSQL.h
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "services/DatabaseService.h"
+#include "SQLProcessor.h"
+
+#include <sstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+//! PutSQL Class
+class PutSQL: public SQLProcessor<PutSQL> {
+ public:
+  explicit PutSQL(const std::string& name, utils::Identifier uuid = utils::Identifier());
+  virtual ~PutSQL();
+
+  //! Processor Name
+  static const std::string ProcessorName;
+
+  void processOnSchedule(const core::ProcessContext &context);
+  void processOnTrigger(core::ProcessSession &session);
+  
+  void initialize() override;
+
+  static const core::Property s_sqlStatements;
+
+  static const core::Relationship s_success;
+
+ private:
+   std::vector<std::string> sqlStatements_;
+};
+
+REGISTER_RESOURCE(PutSQL, "PutSQL to execute SQL command via ODBC.");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/QueryDatabaseTable.cpp b/extensions/sql/processors/QueryDatabaseTable.cpp
new file mode 100644
index 0000000..a7d7033
--- /dev/null
+++ b/extensions/sql/processors/QueryDatabaseTable.cpp
@@ -0,0 +1,471 @@
+/**
+ * @file QueryDatabaseTable.cpp
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueryDatabaseTable.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <codecvt>
+#include <algorithm>
+#include <regex>
+
+#include <soci/soci.h>
+
+#include "io/DataStream.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+#include "utils/OsUtils.h"
+#include "data/DatabaseConnectors.h"
+#include "data/JSONSQLWriter.h"
+#include "data/SQLRowsetProcessor.h"
+#include "data/WriteCallback.h"
+#include "data/MaxCollector.h"
+#include "data/Utils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
+
+const core::Property QueryDatabaseTable::s_tableName(
+  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_columnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+    "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
+    "If no column names are supplied, all columns in the specified table will be returned. "
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+    "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
+    "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
+    "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
+    "This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
+    "Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
+    "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
+    "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
+    "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
+    supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_whereClause(
+  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
+    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_sqlQuery(
+  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
+    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
+    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
+    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+
+const core::Property QueryDatabaseTable::s_stateDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+
+const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
+
+const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
+
+static const std::string ResultTableName = "tablename";
+static const std::string ResultRowCount = "querydbtable.row.count";
+
+// State
+class State {
+ public:
+  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+    :tableName_(tableName), logger_(logger) {
+    if (!createUUIDDir(stateDir, uuid, filePath_))
+      return;
+
+    filePath_ += "State.txt";
+
+    if (!getStateFromFile())
+      return;
+
+    ok_ = true;
+  }
+
+  ~State() {}
+
+  explicit operator bool() const {
+    return ok_;
+  }
+
+  std::unordered_map<std::string, std::string> mapState() const {
+    return mapState_;
+  }
+
+  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
+    file_.seekp(std::ios::beg);
+
+    file_ << tableName_ << separator();
+    auto dataSize = tableName_.size() + separator().size();
+
+    for (const auto& el : mapState) {
+      file_ << el.first << '=' << el.second << separator();
+      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+    }
+
+    // If a maxValueColumnName type is varchar, a new max value 'dataSize' can be shorter than previous max value 'dataSize_' - clear difference with ' ' to keep file format.
+    if (dataSize_ > dataSize) {
+      for (auto i = dataSize_ - dataSize; i > 0; i--) {
+        file_ << ' ';
+      }
+    }
+    dataSize_ = dataSize;
+
+    file_.flush();
+
+    mapState_ = mapState;
+  }
+
+ private:
+   static const std::string& separator() {
+     static const std::string s_separator = "@!qdt!@";
+     return s_separator;
+   }
+
+   bool createUUIDDir(const std::string& stateDir, const std::string& uuid, std::string& dir)
+   {
+     if (stateDir.empty()) {
+       dir.clear();
+       return false;
+     }
+
+     const auto dirSeparator = utils::file::FileUtils::get_separator();
+
+     auto dirWithSlash = stateDir;
+     if (stateDir.back() != dirSeparator) {
+       dirWithSlash += dirSeparator;
+     }
+
+     dir = dirWithSlash + "uuid" + dirSeparator + uuid + dirSeparator;
+
+     utils::file::FileUtils::create_dir(dir);
+
+     if (!utils::file::FileUtils::is_directory(dir.c_str())) {
+       logger_->log_error("Cannot create %s", dir.c_str());
+       dir.clear();
+       return false;
+     }
+
+     return true;
+   }
+
+   bool getStateFromFile() {
+     std::string state;
+
+     std::ifstream file(filePath_);
+     if (!file) {
+       return createEmptyStateFile();
+     }
+
+     std::stringstream ss;
+     ss << file.rdbuf();
+
+     state = ss.str();
+
+     dataSize_ = state.size();
+
+     file.close();
+
+     std::vector<std::string> listColumnNameValue;
+
+     size_t pos = state.find(separator(), 0);
+     if (pos == std::string::npos) {
+       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
+       mapState_.clear();
+       return createEmptyStateFile();
+     }
+
+     auto tableName = state.substr(0, pos);
+     if (tableName != tableName_) {
+       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
+       mapState_.clear();
+
+       return createEmptyStateFile();
+     }
+
+     pos += separator().size();
+
+     while (true) {
+       auto newPos = state.find(separator(), pos);
+       if (newPos == std::string::npos)
+         break;
+
+       const std::string& columnNameValue = state.substr(pos, newPos - pos);
+       listColumnNameValue.emplace_back(columnNameValue);
+
+       pos = newPos + separator().size();
+     }
+
+     for (const auto& columnNameValue : listColumnNameValue) {
+       const auto posEQ = columnNameValue.find('=');
+       if (posEQ == std::string::npos) {
+         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
+         mapState_.clear();
+         return createEmptyStateFile();
+       }
+
+       const auto& name = columnNameValue.substr(0, posEQ);
+       const auto& value = columnNameValue.substr(posEQ + 1);
+
+       mapState_.insert({ name, value });
+     }
+
+     file_.open(filePath_);
+     if (!file_.is_open()) {
+       logger_->log_error("Cannot open %s", filePath_.c_str());
+       mapState_.clear();
+       return false;
+     }
+
+     return true;
+   }
+
+   bool createEmptyStateFile() {
+     file_.open(filePath_, std::ios::out);
+     if (!file_.is_open()) {
+       logger_->log_error("Cannot open '%s' file", filePath_.c_str());
+       return false;
+     }
+
+     dataSize_ = 0;
+
+     return true;
+   }
+
+ private:
+   std::unordered_map<std::string, std::string> mapState_;
+   std::shared_ptr<logging::Logger> logger_;
+   std::string filePath_;
+   std::fstream file_;
+   size_t dataSize_{};
+   std::string tableName_;
+   bool ok_{};
+};
+
+
+// QueryDatabaseTable
+QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
+  : SQLProcessor(name, uuid) {
+}
+
+QueryDatabaseTable::~QueryDatabaseTable() {
+}
+
+void QueryDatabaseTable::initialize() {
+  //! Set the supported properties
+  setSupportedProperties( { dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+
+  //! Set the supported relationships
+  setSupportedRelationships( { s_success });
+}
+
+void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context) {
+  initOutputFormat(context);
+
+  context.getProperty(s_tableName.getName(), tableName_);
+  context.getProperty(s_columnNames.getName(), columnNames_);
+
+  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
+  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+
+  context.getProperty(s_whereClause.getName(), whereClause_);
+  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
+  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+
+  std::string stateDir;
+  context.getProperty(s_stateDirectory.getName(), stateDir);
+  if (stateDir.empty()) {
+    logger_->log_error("State Directory is empty");
+    return;
+  }
+
+  pState_ = std::make_unique<State>(tableName_, stateDir, getUUIDStr(), logger_);
+  if (!*pState_) {
+    return;
+  }
+  
+  mapState_ = pState_->mapState();
+
+  // If 'listMaxValueColumnName_' doesn't match columns in mapState_, then clear mapState_.
+  if (listMaxValueColumnName_.size() != mapState_.size()) {
+    mapState_.clear();
+  } else {
+    for (const auto& columName : listMaxValueColumnName_) {
+      if (0 == mapState_.count(columName)) {
+        mapState_.clear();
+        break;
+      }
+    }
+  }
+
+  // Add in 'mapState_' new columns which are in 'listMaxValueColumnName_'.
+  for (const auto& maxValueColumnName: listMaxValueColumnName_) {
+    if (0 == mapState_.count(maxValueColumnName)) {
+      mapState_.insert({maxValueColumnName, std::string()});
+    }
+  }
+
+  const auto dynamic_prop_keys = context.getDynamicPropertyKeys();
+  logger_->log_info("Received %zu dynamic properties", dynamic_prop_keys.size());
+
+  // If the stored state for a max value column is empty, populate it with the corresponding initial max value, if it exists.
+  for (const auto& key : dynamic_prop_keys) {
+    if (std::string::npos == key.rfind(s_initialMaxValueDynamicPropertyPrefix, 0)) {
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "QueryDatabaseTable: Unsupported dynamic property \"" + key + "\"");
+    }
+    const auto columnName = utils::toLower(key.substr(s_initialMaxValueDynamicPropertyPrefix.length()));
+    auto it = mapState_.find(columnName);
+    if (it == mapState_.end()) {
+      logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", columnName);
+      continue;
+    }
+    if (!it->second.empty()) {
+      continue;
+    }
+    std::string value;
+    if (context.getDynamicProperty(key, value) && !value.empty()) {
+      it->second = value;
+      logger_->log_info("Setting initial maximum value of %s to %s", columnName, value);
+    }
+  }
+}
+
+void QueryDatabaseTable::processOnTrigger(core::ProcessSession &session) {
+  const auto& selectQuery = getSelectQuery();
+
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
+
+  auto statement = connection_->prepareStatement(selectQuery);
+
+  auto rowset = statement->execute();
+
+  int count = 0;
+  size_t rowCount = 0;
+  sql::MaxCollector maxCollector(selectQuery, maxValueColumnNames_, mapState_);
+  sql::JSONSQLWriter sqlWriter(isJSONPretty());
+  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {&sqlWriter, &maxCollector});
+
+  // Process rowset.
+  do {
+    rowCount = sqlRowsetProcessor.process(maxRowsPerFlowFile_ == 0 ? std::numeric_limits<size_t>::max() : maxRowsPerFlowFile_);
+    count++;
+    if (rowCount == 0)
+      break;
+
+    const auto output = sqlWriter.toString();
+    if (!output.empty()) {
+      WriteCallback writer(output);
+      auto newflow = session.create();
+      newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
+      newflow->addAttribute(ResultTableName, tableName_);
+      session.write(newflow, &writer);
+      session.transfer(newflow, s_success);
+    }
+  } while (rowCount > 0);
+
+  const auto mapState = mapState_;
+  if (maxCollector.updateMapState()) {
+    try {
+      session.commit();
+    } catch (std::exception& e) {
+      mapState_ = mapState;
+      throw;
+    }
+
+    pState_->writeStateToFile(mapState_);
+  }
+}
+
+std::string QueryDatabaseTable::getSelectQuery() {
+  std::string ret;
+
+  if (sqlQuery_.empty()) {
+    std::string columns;
+    if (columnNames_.empty()) {
+      columns = "*";
+    } else {
+      columns = columnNames_;
+    }
+    ret = "select " + columns + " from " + tableName_;
+  } else {
+    ret = sqlQuery_;
+  }
+
+  std::string whereClauses;
+
+  if (!mapState_.empty() && !listMaxValueColumnName_.empty()) {
+    for (auto index = 0U; index < listMaxValueColumnName_.size(); index++) {
+      const auto& columnName = listMaxValueColumnName_[index];
+
+      const auto itState = mapState_.find(columnName);
+
+      const auto& maxValue = itState->second;
+      if (maxValue.empty()) {
+        continue;
+      }
+
+      // Logic to differentiate ">" vs ">=" based on index is copied from: 
+      // https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+      // (under comment "Add a condition for the WHERE clause"). And implementation explanation: https://issues.apache.org/jira/browse/NIFI-2712.
+      if (index == 0) {
+        whereClauses += columnName + " > ";
+      } else {
+        whereClauses += " and " + columnName + " >= ";
+      }
+      whereClauses += maxValue;
+    }
+  }
+
+  if (!whereClause_.empty()) {
+    whereClauses += " and " + whereClause_;
+  }
+
+  if (!whereClauses.empty()) {
+    ret += " where " + whereClauses;
+  }
+
+  return ret;
+}
+
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h
new file mode 100644
index 0000000..412faab
--- /dev/null
+++ b/extensions/sql/processors/QueryDatabaseTable.h
@@ -0,0 +1,95 @@
+/**
+ * @file QueryDatabaseTable.h
+ * PutSQL class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "services/DatabaseService.h"
+#include "SQLProcessor.h"
+#include "OutputFormat.h"
+
+#include <sstream>
+#include <unordered_map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class State;
+
+//! QueryDatabaseTable Class
+class QueryDatabaseTable: public SQLProcessor<QueryDatabaseTable>, public OutputFormat {
+ public:
+  explicit QueryDatabaseTable(const std::string& name, utils::Identifier uuid = utils::Identifier());
+  virtual ~QueryDatabaseTable();
+
+  //! Processor Name
+  static const std::string ProcessorName;
+
+  static const core::Property s_tableName;
+  static const core::Property s_columnNames;
+  static const core::Property s_maxValueColumnNames;
+  static const core::Property s_whereClause;
+  static const core::Property s_sqlQuery;
+  static const core::Property s_maxRowsPerFlowFile;
+  static const core::Property s_stateDirectory;
+
+  static const std::string s_initialMaxValueDynamicPropertyPrefix;
+
+  static const core::Relationship s_success;
+
+  bool supportsDynamicProperties() override {
+    return true;
+  }
+
+  void processOnSchedule(const core::ProcessContext& context);
+  void processOnTrigger(core::ProcessSession& session);
+  
+  void initialize() override;
+
+ private:
+  std::string getSelectQuery();
+
+ private:
+  std::string tableName_;
+  std::string columnNames_;
+  std::string maxValueColumnNames_;
+  std::string whereClause_;
+  std::string sqlQuery_;
+  int maxRowsPerFlowFile_{};
+  std::vector<std::string> listMaxValueColumnName_;
+  std::unordered_map<std::string, std::string> mapState_;
+  std::unordered_map<std::string, soci::data_type> mapColumnType_;
+  std::unique_ptr<State> pState_;
+};
+
+REGISTER_RESOURCE(QueryDatabaseTable, "QueryDatabaseTable to execute SELECT statement via ODBC.");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/processors/SQLProcessor.h b/extensions/sql/processors/SQLProcessor.h
new file mode 100644
index 0000000..d277449
--- /dev/null
+++ b/extensions/sql/processors/SQLProcessor.h
@@ -0,0 +1,105 @@
+/**
+ * @file SQLProcessor.h
+ * SQLProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core/Core.h"
+#include "FlowFileRecord.h"
+#include "concurrentqueue.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+template <typename T>
+class SQLProcessor: public core::Processor {
+ protected:
+  SQLProcessor(const std::string& name, utils::Identifier uuid)
+    : core::Processor(name, uuid), logger_(logging::LoggerFactory<T>::getLogger()) {
+  }
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override {
+    std::string controllerService;
+    context->getProperty(dbControllerService().getName(), controllerService);
+
+    dbService_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
+    if (!dbService_)
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "'DB Controller Service' must be defined");
+
+    static_cast<T*>(this)->processOnSchedule(*context);
+  }
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override {
+    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+    if (!lock.owns_lock()) {
+      logger_->log_warn("'onTrigger' is called before previous 'onTrigger' call is finished.");
+      context->yield();
+      return;
+    }
+
+    try {
+      if (!connection_) {
+        connection_ = dbService_->getConnection();
+      }
+      static_cast<T*>(this)->processOnTrigger(*session);
+    } catch (std::exception& e) {
+      logger_->log_error("SQLProcessor: '%s'", e.what());
+      if (connection_) {
+        std::string exp;
+        if (!connection_->connected(exp)) {
+          logger_->log_error("SQLProcessor: Connection exception: %s", exp.c_str());
+          connection_.reset();
+        }
+      }
+      context->yield();
+    }
+  }
+
+  void notifyStop() override {
+    connection_.reset();
+  }
+
+ protected:
+   static const core::Property& dbControllerService() {
+     static const core::Property s_dbControllerService = 
+       core::PropertyBuilder::createProperty("DB Controller Service")->
+       isRequired(true)->
+       withDescription("Database Controller Service.")->
+       supportsExpressionLanguage(true)->
+       build();
+     return s_dbControllerService;
+   }
+
+   std::shared_ptr<logging::Logger> logger_;
+   std::shared_ptr<sql::controllers::DatabaseService> dbService_;
+   std::unique_ptr<sql::Connection> connection_;
+   std::mutex onTriggerMutex_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/extensions/sql/services/DatabaseService.cpp b/extensions/sql/services/DatabaseService.cpp
new file mode 100644
index 0000000..cb2e8a2
--- /dev/null
+++ b/extensions/sql/services/DatabaseService.cpp
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "DatabaseService.h"
+#include "DatabaseService.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+static core::Property RemoteServer;
+static core::Property Port;
+static core::Property MaxQueueSize;
+
+core::Property DatabaseService::ConnectionString(core::PropertyBuilder::createProperty("Connection String")->withDescription("Database Connection String")->isRequired(true)->build());
+
+void DatabaseService::initialize() {
+  std::lock_guard<std::recursive_mutex> lock(initialization_mutex_);
+
+  if (initialized_)
+    return;
+
+  ControllerService::initialize();
+
+  initializeProperties();
+
+  initialized_ = true;
+}
+
+void DatabaseService::onEnable() {
+  getProperty(ConnectionString.getName(), connection_string_);
+}
+
+void DatabaseService::initializeProperties() {
+  setSupportedProperties( { ConnectionString });
+}
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/services/DatabaseService.h b/extensions/sql/services/DatabaseService.h
new file mode 100644
index 0000000..fb9020f
--- /dev/null
+++ b/extensions/sql/services/DatabaseService.h
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide Database services
+ */
+class DatabaseService : public core::controller::ControllerService {
+ public:
+
+  /**
+   * Constructors for the controller service.
+   */
+  explicit DatabaseService(const std::string &name, const std::string &id)
+      : ControllerService(name, id),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    initialize();
+  }
+
+  explicit DatabaseService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+      : ControllerService(name, uuid),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    initialize();
+  }
+
+  explicit DatabaseService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : ControllerService(name),
+        initialized_(false),
+        logger_(logging::LoggerFactory<DatabaseService>::getLogger()) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  /**
+   * Parameters needed.
+   */
+  static core::Property ConnectionString;
+
+  virtual void initialize() override;
+
+  void yield() override {
+
+  }
+
+  bool isRunning() override {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  bool isWorkAvailable() override {
+    return false;
+  }
+
+  virtual void onEnable() override;
+
+  virtual std::unique_ptr<sql::Connection> getConnection() const = 0;
+
+ protected:
+
+  void initializeProperties();
+
+  // initialization mutex.
+  std::recursive_mutex initialization_mutex_;
+
+  bool initialized_{};
+
+  std::string connection_string_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace controllers */
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_DATABASESERVICE_H_ */
diff --git a/extensions/sql/services/ODBCConnector.cpp b/extensions/sql/services/ODBCConnector.cpp
new file mode 100644
index 0000000..d785bd7
--- /dev/null
+++ b/extensions/sql/services/ODBCConnector.cpp
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "ODBCConnector.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+std::unique_ptr<sql::Connection> ODBCService::getConnection() const {
+  return std::unique_ptr<sql::Connection>(new ODBCConnection(connection_string_));
+}
+
+} /* namespace controllers */
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/services/ODBCConnector.h b/extensions/sql/services/ODBCConnector.h
new file mode 100644
index 0000000..4202d05
--- /dev/null
+++ b/extensions/sql/services/ODBCConnector.h
@@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once 
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "DatabaseService.h"
+#include "core/Resource.h"
+#include "data/DatabaseConnectors.h"
+#include <memory>
+#include <unordered_map>
+
+#include <soci/soci.h>
+#include <soci/odbc/soci-odbc.h>
+
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sql {
+namespace controllers {
+
+class ODBCConnection : public sql::Connection {
+ public:
+  explicit ODBCConnection(const std::string& connectionString)
+    : connection_string_(connectionString) {
+      session_ = std::make_unique<soci::session>(getSessionParameters());
+  }
+
+  virtual ~ODBCConnection() {
+  }
+
+  bool connected(std::string& exception) const override {
+    try {
+      exception.clear();
+      // According to https://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most by Rob Hruska, 
+      // 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Oracle 'SELECT 1 FROM DUAL' works.
+      prepareStatement("select 1")->execute();
+      return true;
+    } catch (std::exception& e) {
+      exception = e.what();
+      return false;
+    }
+  }
+
+  std::unique_ptr<sql::Statement> prepareStatement(const std::string& query) const override {
+    return std::make_unique<sql::Statement>(*session_, query);
+  }
+
+  std::unique_ptr<Session> getSession() const override {
+    return std::make_unique<sql::Session>(*session_);
+  }
+
+ private:
+   soci::connection_parameters getSessionParameters() const {
+     static const soci::backend_factory &backEnd = *soci::factory_odbc();
+
+     soci::connection_parameters parameters(backEnd, connection_string_);
+     parameters.set_option(soci::odbc_option_driver_complete, "0" /* SQL_DRIVER_NOPROMPT */);
+
+     return parameters;
+   }
+
+ private:
+  std::unique_ptr<soci::session> session_;
+  std::string connection_string_;
+};
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide Database services
+ */
+class ODBCService : public DatabaseService {
+ public:
+  explicit ODBCService(const std::string &name, const std::string &id)
+    : DatabaseService(name, id),
+      logger_(logging::LoggerFactory<ODBCService>::getLogger()) {
+    initialize();
+  }
+
+  explicit ODBCService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+    : DatabaseService(name, uuid),
+      logger_(logging::LoggerFactory<ODBCService>::getLogger()) {
+    initialize();
+  }
+
+  explicit ODBCService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : DatabaseService(name),
+        logger_(logging::LoggerFactory<ODBCService>::getLogger()) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  virtual std::unique_ptr<sql::Connection> getConnection() const override;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(ODBCService, "Controller service that provides ODBC database connection");
+
+} /* namespace controllers */
+} /* namespace sql */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/fedora.sh b/fedora.sh
index db28954..ca57258 100644
--- a/fedora.sh
+++ b/fedora.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-verify_enable(){
+verify_enable_platform(){
     feature="$1"
     feature_status=${!1}
     verify_gcc_enable $feature
diff --git a/rheldistro.sh b/rheldistro.sh
index 1c71e92..5999b41 100644
--- a/rheldistro.sh
+++ b/rheldistro.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-verify_enable() {
+verify_enable_platform() {
   feature="$1"
   feature_status=${!1}
   if [ "$OS_MAJOR" = "6" ]; then
diff --git a/suse.sh b/suse.sh
index 7aff26f..f72474c 100644
--- a/suse.sh
+++ b/suse.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-verify_enable() {
+verify_enable_platform() {
   feature="$1"
   feature_status=${!1}
   if [ "$OS_MAJOR" = "6" ]; then
diff --git a/win_build_vs.bat b/win_build_vs.bat
index 0749ffc..c7df9d4 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -26,6 +26,7 @@
 set build_kafka=off
 set build_coap=off
 set build_jni=off
+set build_SQL=off
 set generator="Visual Studio 15 2017"
 set cpack=OFF
 
@@ -45,6 +46,9 @@
 	if [%%~x] EQU [/J] ( 
 	set build_JNI=ON
     )
+	if [%%~x] EQU [/S] ( 
+	set build_SQL=ON
+    )
 	rem if [%%~x] EQU [/C] ( 
 	rem set build_coap=ON
     rem )
@@ -64,7 +68,7 @@
 
 
 
-cmake -G %generator% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON  -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DENABLE_WEL=TRUE -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% .. && msbuild /m nifi-minifi-cpp.sln /property:Configuration=%cmake_build_type% /property:Platform=%build_platform% && copy main\%cmake_build_type%\minifi.exe main\
+cmake -G %generator% -DENABLE_SQL=%build_SQL% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON  -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DENABLE_WEL=TRUE -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% .. && msbuild /m nifi-minifi-cpp.sln /property:Configuration=%cmake_build_type% /property:Platform=%build_platform% && copy main\%cmake_build_type%\minifi.exe main\
 if [%cpack%] EQU [ON] ( 
 	cpack
     )