MINIFICPP-1994 - Adding failure relationship to SQL processors

Closes #1466

Signed-off-by: Martin Zink <martinzink@apache.org>
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 31e0de2..97b6598 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -615,6 +615,7 @@
 | Name    | Description                                              |
 |---------|----------------------------------------------------------|
 | success | Successfully created FlowFile from SQL query result set. |
+| failure | Flow files containing malformed sql statements           |
 
 
 ## ExecuteScript
@@ -2200,6 +2201,7 @@
 | Name    | Description                                                              |
 |---------|--------------------------------------------------------------------------|
 | success | After a successful SQL update operation, the incoming FlowFile sent here |
+| failure | Flow files that contain malformed sql statements                         |
 
 
 ## PutTCP
diff --git a/extensions/sql/data/DatabaseConnectors.h b/extensions/sql/data/DatabaseConnectors.h
index 40fec31..d34b887 100644
--- a/extensions/sql/data/DatabaseConnectors.h
+++ b/extensions/sql/data/DatabaseConnectors.h
@@ -22,12 +22,9 @@
 #include <string>
 #include <vector>
 #include <ctime>
+#include <stdexcept>
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sql {
+namespace org::apache::nifi::minifi::sql {
 
 enum class DataType {
   STRING,
@@ -62,6 +59,18 @@
   virtual void next() = 0;
 };
 
+class ConnectionError : public std::runtime_error {
+ public:
+  using std::runtime_error::runtime_error;
+};
+// Indicates that the error might be caused by a malformed
+// query, constraint violation or something else that won't
+// fix itself on a retry.
+class StatementError : public std::runtime_error {
+ public:
+  using std::runtime_error::runtime_error;
+};
+
 class Statement {
  public:
   explicit Statement(const std::string &query)
@@ -92,9 +101,5 @@
   virtual std::unique_ptr<Session> getSession() const = 0;
 };
 
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::sql
 
diff --git a/extensions/sql/data/SociConnectors.cpp b/extensions/sql/data/SociConnectors.cpp
index 8987a1b..58f81ce 100644
--- a/extensions/sql/data/SociConnectors.cpp
+++ b/extensions/sql/data/SociConnectors.cpp
@@ -17,12 +17,9 @@
  */
 
 #include "SociConnectors.h"
+#include "logging/LoggerFactory.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sql {
+namespace org::apache::nifi::minifi::sql {
 
 void SociRow::setIterator(const soci::rowset<soci::row>::iterator& iter) {
   current_ = iter;
@@ -114,13 +111,29 @@
   current_row_.next();
 }
 
+SociStatement::SociStatement(soci::session &session, const std::string &query)
+    : Statement(query), session_(session), logger_(core::logging::LoggerFactory<SociStatement>::getLogger()) {}
+
 std::unique_ptr<Rowset> SociStatement::execute(const std::vector<std::string>& args) {
-  auto stmt = session_.prepare << query_;
-  for (auto& arg : args) {
-    // binds arguments to the prepared statement
-    stmt.operator,(soci::use(arg));
+  try {
+    auto stmt = session_.prepare << query_;
+    for (auto& arg : args) {
+      // binds arguments to the prepared statement
+      stmt.operator,(soci::use(arg));
+    }
+    return std::make_unique<SociRowset>(stmt);
+  } catch (const soci::soci_error& ex) {
+    logger_->log_error("Error while evaluating query, type: %s, what: %s", typeid(ex).name(), ex.what());
+    if (ex.get_error_category() == soci::soci_error::error_category::connection_error
+        || ex.get_error_category() == soci::soci_error::error_category::system_error) {
+      throw sql::ConnectionError(ex.get_error_message());
+    } else {
+      throw sql::StatementError(ex.get_error_message());
+    }
+  } catch (const std::exception& ex) {
+    logger_->log_error("Error while evaluating query, type: %s, what: %s", typeid(ex).name(), ex.what());
+    throw sql::StatementError(ex.what());
   }
-  return std::make_unique<SociRowset>(stmt);
 }
 
 void SociSession::begin() {
@@ -174,8 +187,4 @@
   return parameters;
 }
 
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::sql
diff --git a/extensions/sql/data/SociConnectors.h b/extensions/sql/data/SociConnectors.h
index 7eee768..f590ccf 100644
--- a/extensions/sql/data/SociConnectors.h
+++ b/extensions/sql/data/SociConnectors.h
@@ -26,12 +26,9 @@
 #include "data/DatabaseConnectors.h"
 #include <soci/soci.h>
 #include <soci/odbc/soci-odbc.h>
+#include "logging/Logger.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sql {
+namespace org::apache::nifi::minifi::sql {
 
 class SociRow : public Row {
  public:
@@ -71,14 +68,15 @@
 
 class SociStatement : public Statement {
  public:
-  explicit SociStatement(soci::session& session, const std::string &query)
-    : Statement(query), session_(session) {
-  }
+  SociStatement(soci::session& session, const std::string &query);
 
   std::unique_ptr<Rowset> execute(const std::vector<std::string>& args = {}) override;
 
  protected:
   soci::session& session_;
+
+ private:
+  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 class SociSession : public Session {
@@ -112,8 +110,4 @@
   std::string connection_string_;
 };
 
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::sql
diff --git a/extensions/sql/processors/ExecuteSQL.cpp b/extensions/sql/processors/ExecuteSQL.cpp
index e4946c3..345fefa 100644
--- a/extensions/sql/processors/ExecuteSQL.cpp
+++ b/extensions/sql/processors/ExecuteSQL.cpp
@@ -62,10 +62,22 @@
     query = to_string(session.readBuffer(input_flow_file));
   }
   if (query.empty()) {
+    logger_->log_error("Empty sql statement");
+    if (input_flow_file) {
+      session.transfer(input_flow_file, Failure);
+      return;
+    }
     throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
   }
 
-  auto row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+  std::unique_ptr<sql::Rowset> row_set;
+  try {
+    row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+  } catch (const sql::StatementError& ex) {
+    logger_->log_error("Error while executing sql statement: %s", ex.what());
+    session.transfer(input_flow_file, Failure);
+    return;
+  }
 
   sql::JSONSQLWriter json_writer{output_format_ == OutputType::JSONPretty};
   FlowFileGenerator flow_file_creator{session, json_writer};
diff --git a/extensions/sql/processors/ExecuteSQL.h b/extensions/sql/processors/ExecuteSQL.h
index ac8c151..3661012 100644
--- a/extensions/sql/processors/ExecuteSQL.h
+++ b/extensions/sql/processors/ExecuteSQL.h
@@ -27,6 +27,7 @@
 #include "SQLProcessor.h"
 #include "FlowFileSource.h"
 #include "utils/ArrayUtils.h"
+#include "core/logging/Logger.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -42,7 +43,8 @@
   }
 
   EXTENSIONAPI static const core::Relationship Success;
-  static auto relationships() { return std::array{Success}; }
+  EXTENSIONAPI static const core::Relationship Failure;
+  static auto relationships() { return std::array{Success, Failure}; }
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
index 8c2914b..4634f89 100644
--- a/extensions/sql/processors/PutSQL.cpp
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -24,6 +24,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
+#include "core/logging/LoggerFactory.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -36,7 +37,11 @@
   setSupportedRelationships(relationships());
 }
 
-void PutSQL::processOnSchedule(core::ProcessContext& /*context*/) {}
+void PutSQL::processOnSchedule(core::ProcessContext& context) {
+  if (auto sql_statement = context.getProperty(SQLStatement); sql_statement && sql_statement->empty()) {
+    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+  }
+}
 
 void PutSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
   auto flow_file = session.get();
@@ -44,18 +49,20 @@
     context.yield();
     return;
   }
-  session.transfer(flow_file, Success);
 
   std::string sql_statement;
   if (!context.getProperty(SQLStatement, sql_statement, flow_file)) {
     logger_->log_debug("Using the contents of the flow file as the SQL statement");
     sql_statement = to_string(session.readBuffer(flow_file));
   }
-  if (sql_statement.empty()) {
-    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
-  }
 
-  connection_->prepareStatement(sql_statement)->execute(collectArguments(flow_file));
+  try {
+    connection_->prepareStatement(sql_statement)->execute(collectArguments(flow_file));
+    session.transfer(flow_file, Success);
+  } catch (const sql::StatementError& ex) {
+    logger_->log_error("Error while executing SQL statement in flow file: %s", ex.what());
+    session.transfer(flow_file, Failure);
+  }
 }
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/sql/processors/PutSQL.h b/extensions/sql/processors/PutSQL.h
index 6a28bc9..ebb45d9 100644
--- a/extensions/sql/processors/PutSQL.h
+++ b/extensions/sql/processors/PutSQL.h
@@ -25,6 +25,7 @@
 #include "core/ProcessSession.h"
 #include "SQLProcessor.h"
 #include "utils/ArrayUtils.h"
+#include "core/logging/Logger.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -40,7 +41,8 @@
   }
 
   EXTENSIONAPI static const core::Relationship Success;
-  static auto relationships() { return std::array{Success}; }
+  EXTENSIONAPI static const core::Relationship Failure;
+  static auto relationships() { return std::array{Success, Failure}; }
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
diff --git a/extensions/sql/processors/SQLProcessor.cpp b/extensions/sql/processors/SQLProcessor.cpp
index 563cea6..b329552 100644
--- a/extensions/sql/processors/SQLProcessor.cpp
+++ b/extensions/sql/processors/SQLProcessor.cpp
@@ -25,17 +25,19 @@
 #include "core/ProcessSession.h"
 #include "Exception.h"
 
-#include <soci/error.h>
-
 namespace org::apache::nifi::minifi::processors {
 
 void SQLProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
   std::string controllerService;
   context->getProperty(DBControllerService.getName(), controllerService);
 
-  db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
-  if (!db_service_) {
-    throw minifi::Exception(PROCESSOR_EXCEPTION, "'" + DBControllerService.getName() + "' must be defined");
+  if (auto service = context->getControllerService(controllerService)) {
+    db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(service);
+    if (!db_service_) {
+      throw minifi::Exception(PROCESSOR_EXCEPTION, "'" + controllerService + "' is not a DatabaseService");
+    }
+  } else {
+    throw minifi::Exception(PROCESSOR_EXCEPTION, "Could not find controller service '" + controllerService + "'");
   }
 
   processOnSchedule(*context);
@@ -46,17 +48,14 @@
     if (!connection_) {
       connection_ = db_service_->getConnection();
     }
-    processOnTrigger(*context, *session);
-  } catch (const soci::soci_error& e) {
-    logger_->log_error("SQLProcessor: '%s'", e.what());
-    if (connection_) {
-      std::string exp;
-      if (!connection_->connected(exp)) {
-        logger_->log_error("SQLProcessor: Connection exception: %s", exp);
-        // try to reconnect next time
-        connection_.reset();
-      }
+    if (!connection_) {
+      throw sql::ConnectionError("Could not establish sql connection");
     }
+    processOnTrigger(*context, *session);
+  } catch (const sql::ConnectionError& ex) {
+    logger_->log_error("Connection error: %s", ex.what());
+    // try to reconnect next time
+    connection_.reset();
     throw;
   }
 }
diff --git a/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp b/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp
index b2d0b90..9375082 100644
--- a/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp
+++ b/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp
@@ -48,6 +48,7 @@
   ->supportsExpressionLanguage(true)->build());
 
 const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
+const core::Relationship ExecuteSQL::Failure{"failure", "Flow files containing malformed sql statements"};
 
 REGISTER_RESOURCE(ExecuteSQL, Processor);
 
@@ -106,7 +107,8 @@
       "the incoming flow file is expected to contain a valid SQL statement, to be issued by the processor to the database.")
   ->supportsExpressionLanguage(true)->build());
 
-const core::Relationship PutSQL::Success("success", "Database is successfully updated.");
+const core::Relationship PutSQL::Success("success", "After a successful SQL update operation, the incoming FlowFile sent here");
+const core::Relationship PutSQL::Failure{"failure", "Flow files that contain malformed sql statements"};
 
 REGISTER_RESOURCE(PutSQL, Processor);
 
diff --git a/libminifi/include/core/ProcessSessionFactory.h b/libminifi/include/core/ProcessSessionFactory.h
index 867f49c..482fe0a 100644
--- a/libminifi/include/core/ProcessSessionFactory.h
+++ b/libminifi/include/core/ProcessSessionFactory.h
@@ -43,13 +43,15 @@
   }
 
   // Create the session
-  std::shared_ptr<ProcessSession> createSession();
+  virtual std::shared_ptr<ProcessSession> createSession();
 
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   ProcessSessionFactory(const ProcessSessionFactory &parent) = delete;
   ProcessSessionFactory &operator=(const ProcessSessionFactory &parent) = delete;
 
+  virtual ~ProcessSessionFactory() = default;
+
  private:
   // ProcessContext
   std::shared_ptr<ProcessContext> process_context_;
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index bc34d23..ba2cb6d 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -234,12 +234,16 @@
   processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
   processor_mapping_[processor->getUUID()] = processor;
   if (!linkToPrevious) {
-    termination_ = *(relationships.begin());
+    if (!std::empty(relationships)) {
+      termination_ = *(relationships.begin());
+    }
   } else {
     std::shared_ptr<minifi::core::Processor> last = processor_queue_.back();
     if (last == nullptr) {
       last = processor;
-      termination_ = *(relationships.begin());
+      if (!std::empty(relationships)) {
+        termination_ = *(relationships.begin());
+      }
     }
     std::stringstream connection_name;
     connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
@@ -449,6 +453,21 @@
   return runProcessor(processor_location, verify);
 }
 
+class TestSessionFactory : public minifi::core::ProcessSessionFactory {
+  using SessionCallback = std::function<void(const std::shared_ptr<minifi::core::ProcessSession>&)>;
+ public:
+  TestSessionFactory(std::shared_ptr<minifi::core::ProcessContext> context, SessionCallback on_new_session)
+    : ProcessSessionFactory(std::move(context)), on_new_session_(std::move(on_new_session)) {}
+
+  std::shared_ptr<minifi::core::ProcessSession> createSession() override {
+    auto session = ProcessSessionFactory::createSession();
+    on_new_session_(session);
+    return session;
+  }
+
+  SessionCallback on_new_session_;
+};
+
 bool TestPlan::runProcessor(size_t target_location, const PreTriggerVerifier& verify) {
   if (!finalized) {
     finalize();
@@ -459,18 +478,23 @@
   std::shared_ptr<minifi::core::Processor> processor = processor_queue_.at(target_location);
   std::shared_ptr<minifi::core::ProcessContext> context = processor_contexts_.at(target_location);
   scheduleProcessor(processor, context);
-  const auto current_session = std::make_shared<minifi::core::ProcessSession>(context);
-  process_sessions_.push_back(current_session);
   current_flowfile_ = nullptr;
   processor->incrementActiveTasks();
   processor->setScheduledState(minifi::core::ScheduledState::RUNNING);
-  if (verify != nullptr) {
+
+  if (verify) {
+    auto current_session = std::make_shared<minifi::core::ProcessSession>(context);
+    process_sessions_.push_back(current_session);
     verify(context, current_session);
+    current_session->commit();
   } else {
+    auto session_factory = std::make_shared<TestSessionFactory>(context, [&] (auto current_session) {
+      process_sessions_.push_back(current_session);
+    });
     logger_->log_info("Running %s", processor->getName());
-    processor->onTrigger(context, current_session);
+    processor->onTrigger(context, session_factory);
   }
-  current_session->commit();
+
   return gsl::narrow<size_t>(target_location + 1) < processor_queue_.size();
 }
 
@@ -560,10 +584,11 @@
 }
 
 std::unique_ptr<minifi::Connection> TestPlan::buildFinalConnection(const std::shared_ptr<minifi::core::Processor>& processor, bool setDest) {
+  gsl_Expects(termination_);
   std::stringstream connection_name;
   connection_name << processor->getUUIDStr() << "-to-" << processor->getUUIDStr();
   auto connection = std::make_unique<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
-  connection->addRelationship(termination_);
+  connection->addRelationship(termination_.value());
 
   // link the connections so that we can test results at the end for this
   connection->setSource(processor.get());
@@ -581,11 +606,13 @@
 
 void TestPlan::finalize() {
   std::lock_guard<std::recursive_mutex> guard(mutex);
-  if (!relationships_.empty()) {
-    relationships_.push_back(buildFinalConnection(processor_queue_.back()));
-  } else {
-    for (const auto& processor : processor_queue_) {
-      relationships_.push_back(buildFinalConnection(processor, true));
+  if (termination_) {
+    if (!relationships_.empty()) {
+      relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+    } else {
+      for (const auto& processor : processor_queue_) {
+        relationships_.push_back(buildFinalConnection(processor, true));
+      }
     }
   }
 
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index f698930..147a919 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -323,7 +323,7 @@
   std::vector<std::shared_ptr<minifi::core::ProcessSession>> process_sessions_;
   std::vector<std::shared_ptr<minifi::core::ProcessSessionFactory>> factories_;  // Do not assume ordering
   std::vector<std::unique_ptr<minifi::Connection>> relationships_;
-  minifi::core::Relationship termination_;
+  std::optional<minifi::core::Relationship> termination_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/test/sql-tests/ExecuteSQLTests.cpp b/libminifi/test/sql-tests/ExecuteSQLTests.cpp
index 618a20d..948fc9d 100644
--- a/libminifi/test/sql-tests/ExecuteSQLTests.cpp
+++ b/libminifi/test/sql-tests/ExecuteSQLTests.cpp
@@ -194,9 +194,46 @@
 
   auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}, {"failure", "d"}});
 
-  auto input_file = plan->addInput({}, "not a valid sql statement");
+  std::shared_ptr<minifi::core::FlowFile> input_file;
+  SECTION("Invalid sql statement") {
+    input_file = plan->addInput({}, "not a valid sql statement");
+  }
+  SECTION("No such table") {
+    input_file = plan->addInput({}, "SELECT * FROM no_such_table;");
+  }
 
-  REQUIRE_THROWS(plan->run());
+  plan->run();
+
+  REQUIRE(plan->getOutputs({"success", "d"}).empty());
+  auto output = plan->getOutputs({"failure", "d"});
+  REQUIRE(output.size() == 1);
+  REQUIRE(output.at(0) == input_file);
+}
+
+TEST_CASE("ExecuteSQL select query is malformed", "[ExecuteSQL7]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}, {"failure", "d"}});
+  plan->getSQLProcessor()->setProperty(minifi::processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT * FROM test_table WHERE int_col = ?;");
+
+  std::shared_ptr<minifi::core::FlowFile> input_file;
+  SECTION("Less than required arguments") {
+    input_file = plan->addInput({}, "ignored content");
+  }
+// TODO(MINIFICPP-2001):
+//  SECTION("More than required arguments") {
+//    input_file = plan->addInput({
+//      {"sql.args.1.value", "1"},
+//      {"sql.args.2.value", "2"}
+//    }, "ignored content");
+//  }
+
+  plan->run();
+
+  REQUIRE(plan->getOutputs({"success", "d"}).empty());
+  auto output = plan->getOutputs({"failure", "d"});
+  REQUIRE(output.size() == 1);
+  REQUIRE(output.at(0) == input_file);
 }
 
 }  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/sql-tests/PutSQLTests.cpp b/libminifi/test/sql-tests/PutSQLTests.cpp
index 5b42514..02778cb 100644
--- a/libminifi/test/sql-tests/PutSQLTests.cpp
+++ b/libminifi/test/sql-tests/PutSQLTests.cpp
@@ -35,7 +35,7 @@
   REQUIRE(processor->getName() == "processorname");
 }
 
-TEST_CASE("Test Put", "[PutSQLPut]") {
+TEST_CASE("Statement from processor property") {
   SQLTestController testController;
 
   auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}});
@@ -51,6 +51,10 @@
 
   plan->run();
 
+  auto output = plan->getOutputs({"success", "d"});
+  REQUIRE(output.size() == 1);
+  REQUIRE(output.at(0) == input_file);
+
   // Verify output state
   auto rows = testController.fetchValues();
   REQUIRE(rows.size() == 1);
@@ -58,7 +62,7 @@
   REQUIRE(rows[0].text_col == "asdf");
 }
 
-TEST_CASE("Test Put Content", "[PutSQLPutContent]") {
+TEST_CASE("Statement from flow file content") {
   SQLTestController testController;
 
   auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}});
@@ -69,6 +73,10 @@
 
   plan->run();
 
+  auto output = plan->getOutputs({"success", "d"});
+  REQUIRE(output.size() == 1);
+  REQUIRE(output.at(0) == input_file);
+
   // Verify output state
   auto rows = testController.fetchValues();
   REQUIRE(rows.size() == 1);
@@ -76,3 +84,80 @@
   REQUIRE(rows[0].text_col == "fdsa");
 }
 
+TEST_CASE("PutSQL routes to failure on malformed statement") {
+  SQLTestController testController;
+
+  auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}, {"failure", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+
+  std::shared_ptr<core::FlowFile> input_file;
+  SECTION("Missing parameter") {
+    input_file = plan->addInput();
+  }
+// TODO(MINIFICPP-2002):
+//  SECTION("Invalid parameter type") {
+//    input_file = plan->addInput({
+//      {"sql.args.1.value", "banana"},
+//    });
+//  }
+
+  sql_proc->setProperty(
+      "SQL Statement",
+      "INSERT INTO test_table (int_col, text_col) VALUES (?, 'asdf')");
+
+  plan->run();
+
+  REQUIRE(plan->getOutputs({"success", "d"}).empty());
+  auto output = plan->getOutputs({"failure", "d"});
+  REQUIRE(output.size() == 1);
+  REQUIRE(output.at(0) == input_file);
+}
+
+TEST_CASE("PutSQL routes to failure on malformed content statement") {
+  SQLTestController testController;
+
+  auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}, {"failure", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+
+  std::shared_ptr<core::FlowFile> input_file;
+  SECTION("No parameters") {
+    input_file = plan->addInput({}, "INSERT INTO test_table VALUES(?, ?);");
+  }
+  SECTION("Not enough parameters") {
+    input_file = plan->addInput({
+      {"sql.args.1.value", "42"}
+    }, "INSERT INTO test_table VALUES(?, ?);");
+  }
+// TODO(MINIFICPP-2001):
+//  SECTION("Too many parameters") {
+//    input_file = plan->addInput({
+//      {"sql.args.1.value", "42"},
+//      {"sql.args.2.value", "banana"},
+//      {"sql.args.3.value", "too_many"}
+//    }, "INSERT INTO test_table VALUES(?, ?);");
+//  }
+// TODO(MINIFICPP-2002):
+//  SECTION("Invalid parameter type") {
+//    input_file = plan->addInput({
+//      {"sql.args.1.value", "banana"},
+//      {"sql.args.2.value", "apple"}
+//    }, "INSERT INTO test_table VALUES(?, ?);");
+//  }
+  SECTION("No such table") {
+    input_file = plan->addInput({
+      {"sql.args.1.value", "42"}
+    }, "INSERT INTO no_such_table VALUES(?);");
+  }
+  SECTION("Garbage statement") {
+    input_file = plan->addInput({
+      {"sql.args.1.value", "42"}
+    }, "ajshdjhasgdkashdiahfbuauwlkdkj;");
+  }
+
+  plan->run();
+
+  REQUIRE(plan->getOutputs({"success", "d"}).empty());
+  auto output = plan->getOutputs({"failure", "d"});
+  REQUIRE(output.size() == 1);
+  REQUIRE(output.at(0) == input_file);
+}
diff --git a/libminifi/test/sql-tests/SQLTestPlan.h b/libminifi/test/sql-tests/SQLTestPlan.h
index a7304ad..10c4b3e 100644
--- a/libminifi/test/sql-tests/SQLTestPlan.h
+++ b/libminifi/test/sql-tests/SQLTestPlan.h
@@ -40,7 +40,7 @@
  public:
   SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
     plan_ = controller.createPlan();
-    processor_ = plan_->addProcessor(sql_processor, sql_processor);
+    processor_ = plan_->addProcessor(sql_processor, sql_processor, {}, false);
     plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
     input_ = plan_->addConnection({}, {"success", "d"}, processor_);
     for (const auto& output_rel : output_rels) {
diff --git a/libminifi/test/sql-tests/mocks/MockConnectors.cpp b/libminifi/test/sql-tests/mocks/MockConnectors.cpp
index 6854a97..25d5a23 100644
--- a/libminifi/test/sql-tests/mocks/MockConnectors.cpp
+++ b/libminifi/test/sql-tests/mocks/MockConnectors.cpp
@@ -176,7 +176,7 @@
   } else if (minifi::utils::StringUtils::startsWith(query, "select", false)) {
     return select(query, args);
   } else {
-    throw std::runtime_error("Unknown query type");
+    throw sql::StatementError("Unknown query type");
   }
 
   return nullptr;
@@ -204,11 +204,17 @@
   for (const auto& arg : args) {
     replaced_query = minifi::utils::StringUtils::replaceOne(replaced_query, "?", arg);
   }
+  if (replaced_query.find('?') != std::string::npos) {
+    throw sql::StatementError("Less arguments than required by the insert query");
+  }
 
   std::smatch match;
   std::regex expr(R"(insert into (\w+)\s*(\((.*)\))*\s*values\s*\((.+)\))", std::regex_constants::icase);
   std::regex_search(replaced_query, match, expr);
   std::string table_name = match[1];
+  if (!tables_.contains(table_name)) {
+    throw sql::StatementError("No such table: '" + table_name + "'");
+  }
   std::vector<std::string> values = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(match[4], ",");
   for (auto& value : values) {
     value = minifi::utils::StringUtils::removeFramingCharacters(value, '\'');
@@ -238,6 +244,9 @@
   for (const auto& arg : args) {
     replaced_query = minifi::utils::StringUtils::replaceOne(replaced_query, "?", arg);
   }
+  if (replaced_query.find('?') != std::string::npos) {
+    throw sql::StatementError("Less arguments than required by the select query");
+  }
 
   std::smatch match;
   std::regex expr(R"(select\s+(.+)\s+from\s+(\w+)\s*(where ((.+(?= order by))|.+$))*\s*(order by (.+))*)", std::regex_constants::icase);
@@ -247,6 +256,9 @@
     cols = {};
   }
   std::string table_name = match[2];
+  if (!tables_.contains(table_name)) {
+    throw sql::StatementError("No such table: '" + table_name + "'");
+  }
   std::string condition_str = match[4];
   auto condition = parseWhereCondition(condition_str);
   std::string order = match[7];