MINIFICPP-1994 - Adding failure relationship to SQL processors
Closes #1466
Signed-off-by: Martin Zink <martinzink@apache.org>
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 31e0de2..97b6598 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -615,6 +615,7 @@
| Name | Description |
|---------|----------------------------------------------------------|
| success | Successfully created FlowFile from SQL query result set. |
+| failure | Flow files containing malformed sql statements |
## ExecuteScript
@@ -2200,6 +2201,7 @@
| Name | Description |
|---------|--------------------------------------------------------------------------|
| success | After a successful SQL update operation, the incoming FlowFile sent here |
+| failure | Flow files that contain malformed sql statements |
## PutTCP
diff --git a/extensions/sql/data/DatabaseConnectors.h b/extensions/sql/data/DatabaseConnectors.h
index 40fec31..d34b887 100644
--- a/extensions/sql/data/DatabaseConnectors.h
+++ b/extensions/sql/data/DatabaseConnectors.h
@@ -22,12 +22,9 @@
#include <string>
#include <vector>
#include <ctime>
+#include <stdexcept>
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sql {
+namespace org::apache::nifi::minifi::sql {
enum class DataType {
STRING,
@@ -62,6 +59,18 @@
virtual void next() = 0;
};
+class ConnectionError : public std::runtime_error {
+ public:
+ using std::runtime_error::runtime_error;
+};
+// Indicates that the error might be caused by a malformed
+// query, constraint violation or something else that won't
+// fix itself on a retry.
+class StatementError : public std::runtime_error {
+ public:
+ using std::runtime_error::runtime_error;
+};
+
class Statement {
public:
explicit Statement(const std::string &query)
@@ -92,9 +101,5 @@
virtual std::unique_ptr<Session> getSession() const = 0;
};
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::sql
diff --git a/extensions/sql/data/SociConnectors.cpp b/extensions/sql/data/SociConnectors.cpp
index 8987a1b..58f81ce 100644
--- a/extensions/sql/data/SociConnectors.cpp
+++ b/extensions/sql/data/SociConnectors.cpp
@@ -17,12 +17,9 @@
*/
#include "SociConnectors.h"
+#include "logging/LoggerFactory.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sql {
+namespace org::apache::nifi::minifi::sql {
void SociRow::setIterator(const soci::rowset<soci::row>::iterator& iter) {
current_ = iter;
@@ -114,13 +111,29 @@
current_row_.next();
}
+SociStatement::SociStatement(soci::session &session, const std::string &query)
+ : Statement(query), session_(session), logger_(core::logging::LoggerFactory<SociStatement>::getLogger()) {}
+
std::unique_ptr<Rowset> SociStatement::execute(const std::vector<std::string>& args) {
- auto stmt = session_.prepare << query_;
- for (auto& arg : args) {
- // binds arguments to the prepared statement
- stmt.operator,(soci::use(arg));
+ try {
+ auto stmt = session_.prepare << query_;
+ for (auto& arg : args) {
+ // binds arguments to the prepared statement
+ stmt.operator,(soci::use(arg));
+ }
+ return std::make_unique<SociRowset>(stmt);
+ } catch (const soci::soci_error& ex) {
+ logger_->log_error("Error while evaluating query, type: %s, what: %s", typeid(ex).name(), ex.what());
+ if (ex.get_error_category() == soci::soci_error::error_category::connection_error
+ || ex.get_error_category() == soci::soci_error::error_category::system_error) {
+ throw sql::ConnectionError(ex.get_error_message());
+ } else {
+ throw sql::StatementError(ex.get_error_message());
+ }
+ } catch (const std::exception& ex) {
+ logger_->log_error("Error while evaluating query, type: %s, what: %s", typeid(ex).name(), ex.what());
+ throw sql::StatementError(ex.what());
}
- return std::make_unique<SociRowset>(stmt);
}
void SociSession::begin() {
@@ -174,8 +187,4 @@
return parameters;
}
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::sql
diff --git a/extensions/sql/data/SociConnectors.h b/extensions/sql/data/SociConnectors.h
index 7eee768..f590ccf 100644
--- a/extensions/sql/data/SociConnectors.h
+++ b/extensions/sql/data/SociConnectors.h
@@ -26,12 +26,9 @@
#include "data/DatabaseConnectors.h"
#include <soci/soci.h>
#include <soci/odbc/soci-odbc.h>
+#include "logging/Logger.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sql {
+namespace org::apache::nifi::minifi::sql {
class SociRow : public Row {
public:
@@ -71,14 +68,15 @@
class SociStatement : public Statement {
public:
- explicit SociStatement(soci::session& session, const std::string &query)
- : Statement(query), session_(session) {
- }
+ SociStatement(soci::session& session, const std::string &query);
std::unique_ptr<Rowset> execute(const std::vector<std::string>& args = {}) override;
protected:
soci::session& session_;
+
+ private:
+ std::shared_ptr<core::logging::Logger> logger_;
};
class SociSession : public Session {
@@ -112,8 +110,4 @@
std::string connection_string_;
};
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::sql
diff --git a/extensions/sql/processors/ExecuteSQL.cpp b/extensions/sql/processors/ExecuteSQL.cpp
index e4946c3..345fefa 100644
--- a/extensions/sql/processors/ExecuteSQL.cpp
+++ b/extensions/sql/processors/ExecuteSQL.cpp
@@ -62,10 +62,22 @@
query = to_string(session.readBuffer(input_flow_file));
}
if (query.empty()) {
+ logger_->log_error("Empty sql statement");
+ if (input_flow_file) {
+ session.transfer(input_flow_file, Failure);
+ return;
+ }
throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
}
- auto row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+ std::unique_ptr<sql::Rowset> row_set;
+ try {
+ row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+ } catch (const sql::StatementError& ex) {
+ logger_->log_error("Error while executing sql statement: %s", ex.what());
+ session.transfer(input_flow_file, Failure);
+ return;
+ }
sql::JSONSQLWriter json_writer{output_format_ == OutputType::JSONPretty};
FlowFileGenerator flow_file_creator{session, json_writer};
diff --git a/extensions/sql/processors/ExecuteSQL.h b/extensions/sql/processors/ExecuteSQL.h
index ac8c151..3661012 100644
--- a/extensions/sql/processors/ExecuteSQL.h
+++ b/extensions/sql/processors/ExecuteSQL.h
@@ -27,6 +27,7 @@
#include "SQLProcessor.h"
#include "FlowFileSource.h"
#include "utils/ArrayUtils.h"
+#include "core/logging/Logger.h"
namespace org::apache::nifi::minifi::processors {
@@ -42,7 +43,8 @@
}
EXTENSIONAPI static const core::Relationship Success;
- static auto relationships() { return std::array{Success}; }
+ EXTENSIONAPI static const core::Relationship Failure;
+ static auto relationships() { return std::array{Success, Failure}; }
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
index 8c2914b..4634f89 100644
--- a/extensions/sql/processors/PutSQL.cpp
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -24,6 +24,7 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "Exception.h"
+#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi::processors {
@@ -36,7 +37,11 @@
setSupportedRelationships(relationships());
}
-void PutSQL::processOnSchedule(core::ProcessContext& /*context*/) {}
+void PutSQL::processOnSchedule(core::ProcessContext& context) {
+ if (auto sql_statement = context.getProperty(SQLStatement); sql_statement && sql_statement->empty()) {
+ throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+ }
+}
void PutSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto flow_file = session.get();
@@ -44,18 +49,20 @@
context.yield();
return;
}
- session.transfer(flow_file, Success);
std::string sql_statement;
if (!context.getProperty(SQLStatement, sql_statement, flow_file)) {
logger_->log_debug("Using the contents of the flow file as the SQL statement");
sql_statement = to_string(session.readBuffer(flow_file));
}
- if (sql_statement.empty()) {
- throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
- }
- connection_->prepareStatement(sql_statement)->execute(collectArguments(flow_file));
+ try {
+ connection_->prepareStatement(sql_statement)->execute(collectArguments(flow_file));
+ session.transfer(flow_file, Success);
+ } catch (const sql::StatementError& ex) {
+ logger_->log_error("Error while executing SQL statement in flow file: %s", ex.what());
+ session.transfer(flow_file, Failure);
+ }
}
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/sql/processors/PutSQL.h b/extensions/sql/processors/PutSQL.h
index 6a28bc9..ebb45d9 100644
--- a/extensions/sql/processors/PutSQL.h
+++ b/extensions/sql/processors/PutSQL.h
@@ -25,6 +25,7 @@
#include "core/ProcessSession.h"
#include "SQLProcessor.h"
#include "utils/ArrayUtils.h"
+#include "core/logging/Logger.h"
namespace org::apache::nifi::minifi::processors {
@@ -40,7 +41,8 @@
}
EXTENSIONAPI static const core::Relationship Success;
- static auto relationships() { return std::array{Success}; }
+ EXTENSIONAPI static const core::Relationship Failure;
+ static auto relationships() { return std::array{Success, Failure}; }
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
diff --git a/extensions/sql/processors/SQLProcessor.cpp b/extensions/sql/processors/SQLProcessor.cpp
index 563cea6..b329552 100644
--- a/extensions/sql/processors/SQLProcessor.cpp
+++ b/extensions/sql/processors/SQLProcessor.cpp
@@ -25,17 +25,19 @@
#include "core/ProcessSession.h"
#include "Exception.h"
-#include <soci/error.h>
-
namespace org::apache::nifi::minifi::processors {
void SQLProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
std::string controllerService;
context->getProperty(DBControllerService.getName(), controllerService);
- db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
- if (!db_service_) {
- throw minifi::Exception(PROCESSOR_EXCEPTION, "'" + DBControllerService.getName() + "' must be defined");
+ if (auto service = context->getControllerService(controllerService)) {
+ db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(service);
+ if (!db_service_) {
+ throw minifi::Exception(PROCESSOR_EXCEPTION, "'" + controllerService + "' is not a DatabaseService");
+ }
+ } else {
+ throw minifi::Exception(PROCESSOR_EXCEPTION, "Could not find controller service '" + controllerService + "'");
}
processOnSchedule(*context);
@@ -46,17 +48,14 @@
if (!connection_) {
connection_ = db_service_->getConnection();
}
- processOnTrigger(*context, *session);
- } catch (const soci::soci_error& e) {
- logger_->log_error("SQLProcessor: '%s'", e.what());
- if (connection_) {
- std::string exp;
- if (!connection_->connected(exp)) {
- logger_->log_error("SQLProcessor: Connection exception: %s", exp);
- // try to reconnect next time
- connection_.reset();
- }
+ if (!connection_) {
+ throw sql::ConnectionError("Could not establish sql connection");
}
+ processOnTrigger(*context, *session);
+ } catch (const sql::ConnectionError& ex) {
+ logger_->log_error("Connection error: %s", ex.what());
+ // try to reconnect next time
+ connection_.reset();
throw;
}
}
diff --git a/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp b/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp
index b2d0b90..9375082 100644
--- a/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp
+++ b/extensions/sql/processors/SQLProcessorStaticDefinitions.cpp
@@ -48,6 +48,7 @@
->supportsExpressionLanguage(true)->build());
const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
+const core::Relationship ExecuteSQL::Failure{"failure", "Flow files containing malformed sql statements"};
REGISTER_RESOURCE(ExecuteSQL, Processor);
@@ -106,7 +107,8 @@
"the incoming flow file is expected to contain a valid SQL statement, to be issued by the processor to the database.")
->supportsExpressionLanguage(true)->build());
-const core::Relationship PutSQL::Success("success", "Database is successfully updated.");
+const core::Relationship PutSQL::Success("success", "After a successful SQL update operation, the incoming FlowFile sent here");
+const core::Relationship PutSQL::Failure{"failure", "Flow files that contain malformed sql statements"};
REGISTER_RESOURCE(PutSQL, Processor);
diff --git a/libminifi/include/core/ProcessSessionFactory.h b/libminifi/include/core/ProcessSessionFactory.h
index 867f49c..482fe0a 100644
--- a/libminifi/include/core/ProcessSessionFactory.h
+++ b/libminifi/include/core/ProcessSessionFactory.h
@@ -43,13 +43,15 @@
}
// Create the session
- std::shared_ptr<ProcessSession> createSession();
+ virtual std::shared_ptr<ProcessSession> createSession();
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProcessSessionFactory(const ProcessSessionFactory &parent) = delete;
ProcessSessionFactory &operator=(const ProcessSessionFactory &parent) = delete;
+ virtual ~ProcessSessionFactory() = default;
+
private:
// ProcessContext
std::shared_ptr<ProcessContext> process_context_;
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index bc34d23..ba2cb6d 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -234,12 +234,16 @@
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
processor_mapping_[processor->getUUID()] = processor;
if (!linkToPrevious) {
- termination_ = *(relationships.begin());
+ if (!std::empty(relationships)) {
+ termination_ = *(relationships.begin());
+ }
} else {
std::shared_ptr<minifi::core::Processor> last = processor_queue_.back();
if (last == nullptr) {
last = processor;
- termination_ = *(relationships.begin());
+ if (!std::empty(relationships)) {
+ termination_ = *(relationships.begin());
+ }
}
std::stringstream connection_name;
connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
@@ -449,6 +453,21 @@
return runProcessor(processor_location, verify);
}
+class TestSessionFactory : public minifi::core::ProcessSessionFactory {
+ using SessionCallback = std::function<void(const std::shared_ptr<minifi::core::ProcessSession>&)>;
+ public:
+ TestSessionFactory(std::shared_ptr<minifi::core::ProcessContext> context, SessionCallback on_new_session)
+ : ProcessSessionFactory(std::move(context)), on_new_session_(std::move(on_new_session)) {}
+
+ std::shared_ptr<minifi::core::ProcessSession> createSession() override {
+ auto session = ProcessSessionFactory::createSession();
+ on_new_session_(session);
+ return session;
+ }
+
+ SessionCallback on_new_session_;
+};
+
bool TestPlan::runProcessor(size_t target_location, const PreTriggerVerifier& verify) {
if (!finalized) {
finalize();
@@ -459,18 +478,23 @@
std::shared_ptr<minifi::core::Processor> processor = processor_queue_.at(target_location);
std::shared_ptr<minifi::core::ProcessContext> context = processor_contexts_.at(target_location);
scheduleProcessor(processor, context);
- const auto current_session = std::make_shared<minifi::core::ProcessSession>(context);
- process_sessions_.push_back(current_session);
current_flowfile_ = nullptr;
processor->incrementActiveTasks();
processor->setScheduledState(minifi::core::ScheduledState::RUNNING);
- if (verify != nullptr) {
+
+ if (verify) {
+ auto current_session = std::make_shared<minifi::core::ProcessSession>(context);
+ process_sessions_.push_back(current_session);
verify(context, current_session);
+ current_session->commit();
} else {
+ auto session_factory = std::make_shared<TestSessionFactory>(context, [&] (auto current_session) {
+ process_sessions_.push_back(current_session);
+ });
logger_->log_info("Running %s", processor->getName());
- processor->onTrigger(context, current_session);
+ processor->onTrigger(context, session_factory);
}
- current_session->commit();
+
return gsl::narrow<size_t>(target_location + 1) < processor_queue_.size();
}
@@ -560,10 +584,11 @@
}
std::unique_ptr<minifi::Connection> TestPlan::buildFinalConnection(const std::shared_ptr<minifi::core::Processor>& processor, bool setDest) {
+ gsl_Expects(termination_);
std::stringstream connection_name;
connection_name << processor->getUUIDStr() << "-to-" << processor->getUUIDStr();
auto connection = std::make_unique<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
- connection->addRelationship(termination_);
+ connection->addRelationship(termination_.value());
// link the connections so that we can test results at the end for this
connection->setSource(processor.get());
@@ -581,11 +606,13 @@
void TestPlan::finalize() {
std::lock_guard<std::recursive_mutex> guard(mutex);
- if (!relationships_.empty()) {
- relationships_.push_back(buildFinalConnection(processor_queue_.back()));
- } else {
- for (const auto& processor : processor_queue_) {
- relationships_.push_back(buildFinalConnection(processor, true));
+ if (termination_) {
+ if (!relationships_.empty()) {
+ relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+ } else {
+ for (const auto& processor : processor_queue_) {
+ relationships_.push_back(buildFinalConnection(processor, true));
+ }
}
}
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index f698930..147a919 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -323,7 +323,7 @@
std::vector<std::shared_ptr<minifi::core::ProcessSession>> process_sessions_;
std::vector<std::shared_ptr<minifi::core::ProcessSessionFactory>> factories_; // Do not assume ordering
std::vector<std::unique_ptr<minifi::Connection>> relationships_;
- minifi::core::Relationship termination_;
+ std::optional<minifi::core::Relationship> termination_;
private:
std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/test/sql-tests/ExecuteSQLTests.cpp b/libminifi/test/sql-tests/ExecuteSQLTests.cpp
index 618a20d..948fc9d 100644
--- a/libminifi/test/sql-tests/ExecuteSQLTests.cpp
+++ b/libminifi/test/sql-tests/ExecuteSQLTests.cpp
@@ -194,9 +194,46 @@
auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}, {"failure", "d"}});
- auto input_file = plan->addInput({}, "not a valid sql statement");
+ std::shared_ptr<minifi::core::FlowFile> input_file;
+ SECTION("Invalid sql statement") {
+ input_file = plan->addInput({}, "not a valid sql statement");
+ }
+ SECTION("No such table") {
+ input_file = plan->addInput({}, "SELECT * FROM no_such_table;");
+ }
- REQUIRE_THROWS(plan->run());
+ plan->run();
+
+ REQUIRE(plan->getOutputs({"success", "d"}).empty());
+ auto output = plan->getOutputs({"failure", "d"});
+ REQUIRE(output.size() == 1);
+ REQUIRE(output.at(0) == input_file);
+}
+
+TEST_CASE("ExecuteSQL select query is malformed", "[ExecuteSQL7]") {
+ SQLTestController controller;
+
+ auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}, {"failure", "d"}});
+ plan->getSQLProcessor()->setProperty(minifi::processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT * FROM test_table WHERE int_col = ?;");
+
+ std::shared_ptr<minifi::core::FlowFile> input_file;
+ SECTION("Less than required arguments") {
+ input_file = plan->addInput({}, "ignored content");
+ }
+// TODO(MINIFICPP-2001):
+// SECTION("More than required arguments") {
+// input_file = plan->addInput({
+// {"sql.args.1.value", "1"},
+// {"sql.args.2.value", "2"}
+// }, "ignored content");
+// }
+
+ plan->run();
+
+ REQUIRE(plan->getOutputs({"success", "d"}).empty());
+ auto output = plan->getOutputs({"failure", "d"});
+ REQUIRE(output.size() == 1);
+ REQUIRE(output.at(0) == input_file);
}
} // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/sql-tests/PutSQLTests.cpp b/libminifi/test/sql-tests/PutSQLTests.cpp
index 5b42514..02778cb 100644
--- a/libminifi/test/sql-tests/PutSQLTests.cpp
+++ b/libminifi/test/sql-tests/PutSQLTests.cpp
@@ -35,7 +35,7 @@
REQUIRE(processor->getName() == "processorname");
}
-TEST_CASE("Test Put", "[PutSQLPut]") {
+TEST_CASE("Statement from processor property") {
SQLTestController testController;
auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}});
@@ -51,6 +51,10 @@
plan->run();
+ auto output = plan->getOutputs({"success", "d"});
+ REQUIRE(output.size() == 1);
+ REQUIRE(output.at(0) == input_file);
+
// Verify output state
auto rows = testController.fetchValues();
REQUIRE(rows.size() == 1);
@@ -58,7 +62,7 @@
REQUIRE(rows[0].text_col == "asdf");
}
-TEST_CASE("Test Put Content", "[PutSQLPutContent]") {
+TEST_CASE("Statement from flow file content") {
SQLTestController testController;
auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}});
@@ -69,6 +73,10 @@
plan->run();
+ auto output = plan->getOutputs({"success", "d"});
+ REQUIRE(output.size() == 1);
+ REQUIRE(output.at(0) == input_file);
+
// Verify output state
auto rows = testController.fetchValues();
REQUIRE(rows.size() == 1);
@@ -76,3 +84,80 @@
REQUIRE(rows[0].text_col == "fdsa");
}
+TEST_CASE("PutSQL routes to failure on malformed statement") {
+ SQLTestController testController;
+
+ auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}, {"failure", "d"}});
+ auto sql_proc = plan->getSQLProcessor();
+
+ std::shared_ptr<core::FlowFile> input_file;
+ SECTION("Missing parameter") {
+ input_file = plan->addInput();
+ }
+// TODO(MINIFICPP-2002):
+// SECTION("Invalid parameter type") {
+// input_file = plan->addInput({
+// {"sql.args.1.value", "banana"},
+// });
+// }
+
+ sql_proc->setProperty(
+ "SQL Statement",
+ "INSERT INTO test_table (int_col, text_col) VALUES (?, 'asdf')");
+
+ plan->run();
+
+ REQUIRE(plan->getOutputs({"success", "d"}).empty());
+ auto output = plan->getOutputs({"failure", "d"});
+ REQUIRE(output.size() == 1);
+ REQUIRE(output.at(0) == input_file);
+}
+
+TEST_CASE("PutSQL routes to failure on malformed content statement") {
+ SQLTestController testController;
+
+ auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}, {"failure", "d"}});
+ auto sql_proc = plan->getSQLProcessor();
+
+ std::shared_ptr<core::FlowFile> input_file;
+ SECTION("No parameters") {
+ input_file = plan->addInput({}, "INSERT INTO test_table VALUES(?, ?);");
+ }
+ SECTION("Not enough parameters") {
+ input_file = plan->addInput({
+ {"sql.args.1.value", "42"}
+ }, "INSERT INTO test_table VALUES(?, ?);");
+ }
+// TODO(MINIFICPP-2001):
+// SECTION("Too many parameters") {
+// input_file = plan->addInput({
+// {"sql.args.1.value", "42"},
+// {"sql.args.2.value", "banana"},
+// {"sql.args.3.value", "too_many"}
+// }, "INSERT INTO test_table VALUES(?, ?);");
+// }
+// TODO(MINIFICPP-2002):
+// SECTION("Invalid parameter type") {
+// input_file = plan->addInput({
+// {"sql.args.1.value", "banana"},
+// {"sql.args.2.value", "apple"}
+// }, "INSERT INTO test_table VALUES(?, ?);");
+// }
+ SECTION("No such table") {
+ input_file = plan->addInput({
+ {"sql.args.1.value", "42"}
+ }, "INSERT INTO no_such_table VALUES(?);");
+ }
+ SECTION("Garbage statement") {
+ input_file = plan->addInput({
+ {"sql.args.1.value", "42"}
+ }, "ajshdjhasgdkashdiahfbuauwlkdkj;");
+ }
+
+ plan->run();
+
+ REQUIRE(plan->getOutputs({"success", "d"}).empty());
+ auto output = plan->getOutputs({"failure", "d"});
+ REQUIRE(output.size() == 1);
+ REQUIRE(output.at(0) == input_file);
+}
diff --git a/libminifi/test/sql-tests/SQLTestPlan.h b/libminifi/test/sql-tests/SQLTestPlan.h
index a7304ad..10c4b3e 100644
--- a/libminifi/test/sql-tests/SQLTestPlan.h
+++ b/libminifi/test/sql-tests/SQLTestPlan.h
@@ -40,7 +40,7 @@
public:
SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
plan_ = controller.createPlan();
- processor_ = plan_->addProcessor(sql_processor, sql_processor);
+ processor_ = plan_->addProcessor(sql_processor, sql_processor, {}, false);
plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
input_ = plan_->addConnection({}, {"success", "d"}, processor_);
for (const auto& output_rel : output_rels) {
diff --git a/libminifi/test/sql-tests/mocks/MockConnectors.cpp b/libminifi/test/sql-tests/mocks/MockConnectors.cpp
index 6854a97..25d5a23 100644
--- a/libminifi/test/sql-tests/mocks/MockConnectors.cpp
+++ b/libminifi/test/sql-tests/mocks/MockConnectors.cpp
@@ -176,7 +176,7 @@
} else if (minifi::utils::StringUtils::startsWith(query, "select", false)) {
return select(query, args);
} else {
- throw std::runtime_error("Unknown query type");
+ throw sql::StatementError("Unknown query type");
}
return nullptr;
@@ -204,11 +204,17 @@
for (const auto& arg : args) {
replaced_query = minifi::utils::StringUtils::replaceOne(replaced_query, "?", arg);
}
+ if (replaced_query.find('?') != std::string::npos) {
+ throw sql::StatementError("Less arguments than required by the insert query");
+ }
std::smatch match;
std::regex expr(R"(insert into (\w+)\s*(\((.*)\))*\s*values\s*\((.+)\))", std::regex_constants::icase);
std::regex_search(replaced_query, match, expr);
std::string table_name = match[1];
+ if (!tables_.contains(table_name)) {
+ throw sql::StatementError("No such table: '" + table_name + "'");
+ }
std::vector<std::string> values = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(match[4], ",");
for (auto& value : values) {
value = minifi::utils::StringUtils::removeFramingCharacters(value, '\'');
@@ -238,6 +244,9 @@
for (const auto& arg : args) {
replaced_query = minifi::utils::StringUtils::replaceOne(replaced_query, "?", arg);
}
+ if (replaced_query.find('?') != std::string::npos) {
+ throw sql::StatementError("Less arguments than required by the select query");
+ }
std::smatch match;
std::regex expr(R"(select\s+(.+)\s+from\s+(\w+)\s*(where ((.+(?= order by))|.+$))*\s*(order by (.+))*)", std::regex_constants::icase);
@@ -247,6 +256,9 @@
cols = {};
}
std::string table_name = match[2];
+ if (!tables_.contains(table_name)) {
+ throw sql::StatementError("No such table: '" + table_name + "'");
+ }
std::string condition_str = match[4];
auto condition = parseWhereCondition(condition_str);
std::string order = match[7];