| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <optional> |
| |
| #include "SQLTestController.h" |
| #include "processors/ExecuteSQL.h" |
| #include "unit/TestUtils.h" |
| #include "FlowFileMatcher.h" |
| |
| namespace org::apache::nifi::minifi::test { |
| |
| TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") { |
| SQLTestController controller; |
| |
| auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); |
| auto sql_proc = plan->getSQLProcessor(); |
| REQUIRE(sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC")); |
| |
| controller.insertValues({{11, "one"}, {22, "two"}}); |
| |
| plan->run(); |
| |
| auto flow_files = plan->getOutputs({"success", "d"}); |
| REQUIRE(flow_files.size() == 1); |
| std::string row_count; |
| flow_files[0]->getAttribute(minifi::processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); |
| REQUIRE(row_count == "2"); |
| |
| auto content = plan->getContent(flow_files[0]); |
| utils::verifyJSON(content, R"( |
| [{ |
| "int_col": 11, |
| "text_col": "one" |
| },{ |
| "int_col": 22, |
| "text_col": "two" |
| }] |
| )"); |
| } |
| |
| TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") { |
| SQLTestController controller; |
| |
| auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); |
| auto sql_proc = plan->getSQLProcessor(); |
| REQUIRE(sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col = ${int_col_value}")); |
| |
| controller.insertValues({{11, "one"}, {22, "two"}}); |
| |
| auto input_file = plan->addInput({{"int_col_value", "11"}}); |
| |
| plan->run(); |
| |
| auto flow_files = plan->getOutputs({"success", "d"}); |
| REQUIRE(flow_files.size() == 1); |
| std::string row_count; |
| flow_files[0]->getAttribute(minifi::processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); |
| REQUIRE(row_count == "1"); |
| |
| auto content = plan->getContent(flow_files[0]); |
| utils::verifyJSON(content, R"( |
| [{ |
| "int_col": 11, |
| "text_col": "one" |
| }] |
| )"); |
| } |
| |
| TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") { |
| SQLTestController controller; |
| |
| auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); |
| |
| controller.insertValues({{11, "one"}, {22, "two"}}); |
| |
| auto input_file = plan->addInput({}, "SELECT * FROM test_table ORDER BY int_col ASC"); |
| |
| plan->run(); |
| |
| auto flow_files = plan->getOutputs({"success", "d"}); |
| REQUIRE(flow_files.size() == 1); |
| std::string row_count; |
| flow_files[0]->getAttribute(minifi::processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); |
| REQUIRE(row_count == "2"); |
| |
| auto content = plan->getContent(flow_files[0]); |
| utils::verifyJSON(content, R"( |
| [{ |
| "int_col": 11, |
| "text_col": "one" |
| },{ |
| "int_col": 22, |
| "text_col": "two" |
| }] |
| )"); |
| } |
| |
| TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") { |
| SQLTestController controller; |
| |
| auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); |
| |
| controller.insertValues({{11, "apple"}, {11, "banana"}, {22, "banana"}}); |
| |
| auto input_file = plan->addInput({ |
| {"sql.args.1.value", "11"}, |
| {"sql.args.2.value", "banana"} |
| }, "SELECT * FROM test_table WHERE int_col = ? AND text_col = ?"); |
| |
| plan->run(); |
| |
| auto flow_files = plan->getOutputs({"success", "d"}); |
| REQUIRE(flow_files.size() == 1); |
| std::string row_count; |
| flow_files[0]->getAttribute(minifi::processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); |
| REQUIRE(row_count == "1"); |
| |
| auto content = plan->getContent(flow_files[0]); |
| utils::verifyJSON(content, R"( |
| [{ |
| "int_col": 11, |
| "text_col": "banana" |
| }] |
| )"); |
| } |
| |
| TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") { |
| SQLTestController controller; |
| |
| auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); |
| auto sql_proc = plan->getSQLProcessor(); |
| REQUIRE(sql_proc->setProperty(minifi::processors::ExecuteSQL::MaxRowsPerFlowFile.name, "2")); |
| REQUIRE(sql_proc->setProperty(minifi::processors::ExecuteSQL::SQLSelectQuery.name, "SELECT text_col FROM test_table ORDER BY int_col ASC")); |
| |
| controller.insertValues({ |
| {101, "apple"}, |
| {102, "banana"}, |
| {103, "pear"}, |
| {104, "strawberry"}, |
| {105, "pineapple"} |
| }); |
| |
| auto input_file = plan->addInput(); |
| |
| plan->run(); |
| |
| auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) { |
| utils::verifyJSON(plan->getContent(actual), expected); |
| }; |
| |
| FlowFileMatcher matcher{content_verifier, { |
| minifi::processors::ExecuteSQL::RESULT_ROW_COUNT, |
| minifi::processors::ExecuteSQL::FRAGMENT_COUNT, |
| minifi::processors::ExecuteSQL::FRAGMENT_INDEX, |
| minifi::processors::ExecuteSQL::FRAGMENT_IDENTIFIER |
| }}; |
| |
| std::optional<std::string> fragment_id; |
| |
| auto flow_files = plan->getOutputs({"success", "d"}); |
| REQUIRE(flow_files.size() == 3); |
| matcher.verify(flow_files[0], |
| { AttributeValue("2"), AttributeValue("3"), AttributeValue("0"), capture(fragment_id) }, |
| R"([{"text_col": "apple"}, {"text_col": "banana"}])"); |
| REQUIRE(fragment_id); |
| matcher.verify(flow_files[1], |
| { AttributeValue("2"), AttributeValue("3"), AttributeValue("1"), AttributeValue(*fragment_id) }, |
| R"([{"text_col": "pear"}, {"text_col": "strawberry"}])"); |
| matcher.verify(flow_files[2], |
| { AttributeValue("1"), AttributeValue("3"), AttributeValue("2"), AttributeValue(*fragment_id) }, |
| R"([{"text_col": "pineapple"}])"); |
| } |
| |
| TEST_CASE("ExecuteSQL incoming flow file is malformed", "[ExecuteSQL6]") { |
| SQLTestController controller; |
| |
| auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}, {"failure", "d"}}); |
| |
| std::shared_ptr<minifi::core::FlowFile> input_file; |
| SECTION("Invalid sql statement") { |
| input_file = plan->addInput({}, "not a valid sql statement"); |
| } |
| SECTION("No such table") { |
| input_file = plan->addInput({}, "SELECT * FROM no_such_table;"); |
| } |
| |
| plan->run(); |
| |
| REQUIRE(plan->getOutputs({"success", "d"}).empty()); |
| auto output = plan->getOutputs({"failure", "d"}); |
| REQUIRE(output.size() == 1); |
| REQUIRE(output.at(0) == input_file); |
| } |
| |
| TEST_CASE("ExecuteSQL select query is malformed", "[ExecuteSQL7]") { |
| SQLTestController controller; |
| |
| auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}, {"failure", "d"}}); |
| REQUIRE(plan->getSQLProcessor()->setProperty(minifi::processors::ExecuteSQL::SQLSelectQuery.name, "SELECT * FROM test_table WHERE int_col = ?;")); |
| |
| std::shared_ptr<minifi::core::FlowFile> input_file; |
| SECTION("Less than required arguments") { |
| input_file = plan->addInput({}, "ignored content"); |
| } |
| // TODO(MINIFICPP-2001): |
| // SECTION("More than required arguments") { |
| // input_file = plan->addInput({ |
| // {"sql.args.1.value", "1"}, |
| // {"sql.args.2.value", "2"} |
| // }, "ignored content"); |
| // } |
| |
| plan->run(); |
| |
| REQUIRE(plan->getOutputs({"success", "d"}).empty()); |
| auto output = plan->getOutputs({"failure", "d"}); |
| REQUIRE(output.size() == 1); |
| REQUIRE(output.at(0) == input_file); |
| } |
| |
| } // namespace org::apache::nifi::minifi::test |