blob: 6bec5e21bffa142393ca1346ad3d0ef4734e167c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "SQLTestController.h"
#include "unit/TestUtils.h"
#include "FlowFileMatcher.h"
namespace org::apache::nifi::minifi::test {
TEST_CASE("QueryDatabaseTable queries the table and returns specified columns", "[QueryDatabaseTable1]") {
SQLTestController controller;
auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "test_table"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::ColumnNames.name, "text_col"));
controller.insertValues({
{101, "one"},
{102, "two"},
{103, "three"}
});
plan->run();
auto flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 1);
std::string row_count;
flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "3");
auto content = plan->getContent(flow_files[0]);
utils::verifyJSON(content, R"(
[{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}]
)", true);
}
TEST_CASE("QueryDatabaseTable requerying the table returns only new rows", "[QueryDatabaseTable2]") {
SQLTestController controller;
auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "test_table"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::ColumnNames.name, "text_col"));
controller.insertValues({
{101, "one"},
{102, "two"},
{103, "three"}
});
plan->run();
auto first_flow_files = plan->getOutputs({"success", "d"});
REQUIRE(first_flow_files.size() == 1);
controller.insertValues({
{104, "four"},
{105, "five"}
});
SECTION("Run onTrigger only") {plan->run();}
SECTION("Run both onSchedule and onTrigger") {plan->run(true);}
auto second_flow_files = plan->getOutputs({"success", "d"});
REQUIRE(second_flow_files.size() == 1);
std::string row_count;
second_flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "2");
auto content = plan->getContent(second_flow_files[0]);
utils::verifyJSON(content, R"(
[{"text_col": "four"}, {"text_col": "five"}]
)", true);
}
TEST_CASE("QueryDatabaseTable specifying initial max values", "[QueryDatabaseTable3]") {
SQLTestController controller;
auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
auto sql_proc = plan->getSQLProcessor();
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "test_table"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::ColumnNames.name, "text_col"));
REQUIRE(sql_proc->setDynamicProperty("initial.maxvalue.int_col", "102"));
controller.insertValues({
{101, "one"},
{102, "two"},
{103, "three"},
{104, "four"}
});
plan->run();
auto flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 1);
std::string row_count;
flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "2");
auto content = plan->getContent(flow_files[0]);
utils::verifyJSON(content, R"(
[{"text_col": "three"}, {"text_col": "four"}]
)", true);
}
TEST_CASE("QueryDatabaseTable honors Max Rows Per Flow File and sets output attributes", "[QueryDatabaseTable4]") {
SQLTestController controller;
auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "test_table"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::ColumnNames.name, "text_col"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxRowsPerFlowFile.name, "3"));
controller.insertValues({
{101, "one"},
{102, "two"},
{103, "three"},
{104, "four"},
{105, "five"}
});
plan->run();
auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) {
utils::verifyJSON(plan->getContent(actual), expected, true);
};
FlowFileMatcher matcher(content_verifier, {
minifi::processors::QueryDatabaseTable::RESULT_TABLE_NAME,
minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT,
minifi::processors::QueryDatabaseTable::FRAGMENT_COUNT,
minifi::processors::QueryDatabaseTable::FRAGMENT_INDEX,
minifi::processors::QueryDatabaseTable::FRAGMENT_IDENTIFIER,
"maxvalue.int_col"
});
auto flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 2);
std::optional<std::string> fragment_id;
matcher.verify(flow_files[0],
{ AttributeValue("test_table"), AttributeValue("3"), AttributeValue("2"), AttributeValue("0"), capture(fragment_id), AttributeValue("105") },
R"([{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}])");
REQUIRE(fragment_id);
matcher.verify(flow_files[1],
{ AttributeValue("test_table"), AttributeValue("2"), AttributeValue("2"), AttributeValue("1"), AttributeValue(*fragment_id), AttributeValue("105") },
R"([{"text_col": "four"}, {"text_col": "five"}])");
}
TEST_CASE("QueryDatabaseTable changing table name resets state", "[QueryDatabaseTable5]") {
SQLTestController controller;
auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "test_table"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::ColumnNames.name, "text_col"));
controller.insertValues({
{101, "one"},
{102, "two"},
{103, "three"}
});
// query "test_table"
plan->run();
auto flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 1);
std::string row_count;
flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "3");
// query "empty_test_table"
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "empty_test_table"));
plan->run(true);
flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.empty());
// again query "test_table", by now the stored state is reset, so all rows are returned
// set through ProcessContext to avoid corrupting ProcessContextExpr cache
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "test_table"));
plan->run(true);
flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 1);
flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "3");
}
TEST_CASE("QueryDatabaseTable changing maximum value columns resets state", "[QueryDatabaseTable6]") {
SQLTestController controller;
auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::TableName.name, "test_table"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col"));
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::ColumnNames.name, "text_col"));
controller.insertValues({
{101, "one"},
{102, "two"},
{103, "three"}
});
// query using ["int_col"] as max value columns
plan->run();
auto flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 1);
std::string row_count;
flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "3");
// query using ["int_col", "text_col"] as max value columns
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col, text_col"));
plan->run(true);
flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 1);
flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "3");
// query using ["int_col"] as max value columns again
CHECK(plan->setProperty(minifi::processors::QueryDatabaseTable::MaxValueColumnNames.name, "int_col"));
plan->run(true);
flow_files = plan->getOutputs({"success", "d"});
REQUIRE(flow_files.size() == 1);
flow_files[0]->getAttribute(minifi::processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
REQUIRE(row_count == "3");
}
} // namespace org::apache::nifi::minifi::test