blob: 6ba6eee3ffd26c2ede7327c58882392215f0921e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fstream>
#include <map>
#include <memory>
#include <set>
#include <iostream>
#include <GenerateFlowFile.h>
#include <UpdateAttribute.h>
#include <LogAttribute.h>
#include <ExecuteSQL.h>
#include "../TestBase.h"
#include "processors/GetFile.h"
#include "processors/PutFile.h"
#include "PutSQL.h"
TEST_CASE("Test Creation of PutSQL", "[PutSQLCreate]") { // NOLINT
TestController testController;
std::shared_ptr<core::Processor>
processor = std::make_shared<org::apache::nifi::minifi::processors::PutSQL>("processorname");
REQUIRE(processor->getName() == "processorname");
}
TEST_CASE("Test Put", "[PutSQLPut]") { // NOLINT
TestController testController;
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<processors::PutSQL>();
auto plan = testController.createPlan();
auto repo = std::make_shared<TestRepository>();
// Define directory for test db
std::string test_dir("/tmp/gt.XXXXXX");
REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
// Define test db file
std::string test_db(test_dir);
test_db.append("/test.db");
// Create test db
{
minifi::sqlite::SQLiteConnection db(test_db);
auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col TEXT);");
stmt.step();
REQUIRE(stmt.is_ok());
}
// Build MiNiFi processing graph
auto generate = plan->addProcessor(
"GenerateFlowFile",
"Generate");
auto update = plan->addProcessor(
"UpdateAttribute",
"Update",
core::Relationship("success", "description"),
true);
plan->setProperty(
update,
"sql.args.1.value",
"42",
true);
plan->setProperty(
update,
"sql.args.2.value",
"asdf",
true);
auto log = plan->addProcessor(
"LogAttribute",
"Log",
core::Relationship("success", "description"),
true);
auto put = plan->addProcessor(
"PutSQL",
"PutSQL",
core::Relationship("success", "description"),
true);
plan->setProperty(
put,
"Connection URL",
"sqlite://" + test_db);
plan->setProperty(
put,
"SQL Statement",
"INSERT INTO test_table (int_col, text_col) VALUES (?, ?)");
plan->runNextProcessor(); // Generate
plan->runNextProcessor(); // Update
plan->runNextProcessor(); // Log
plan->runNextProcessor(); // PutSQL
// Verify output state
{
minifi::sqlite::SQLiteConnection db(test_db);
auto stmt = db.prepare("SELECT int_col, text_col FROM test_table;");
stmt.step();
REQUIRE(stmt.is_ok());
REQUIRE(42 == stmt.column_int64(0));
REQUIRE("asdf" == stmt.column_text(1));
}
}
TEST_CASE("Test Put Content", "[PutSQLPutContent]") { // NOLINT
TestController testController;
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::GetFile>();
LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<processors::PutSQL>();
auto plan = testController.createPlan();
auto repo = std::make_shared<TestRepository>();
// Define directory for test db
std::string test_dir("/tmp/gt.XXXXXX");
REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
// Define test db file
std::string test_db(test_dir);
test_db.append("/test.db");
// Define directory for test input file
std::string test_in_dir("/tmp/gt.XXXXXX");
REQUIRE(!testController.createTempDirectory(&test_in_dir[0]).empty());
// Define test input file
std::string test_file(test_in_dir);
test_file.append("/test.in");
// Write test SQL content
{
std::ofstream os(test_file);
os << "INSERT INTO test_table VALUES(?, ?);";
}
// Create test db
{
minifi::sqlite::SQLiteConnection db(test_db);
auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col TEXT);");
stmt.step();
REQUIRE(stmt.is_ok());
}
// Build MiNiFi processing graph
auto get_file = plan->addProcessor(
"GetFile",
"Get");
plan->setProperty(
get_file,
processors::GetFile::Directory.getName(), test_in_dir);
auto update = plan->addProcessor(
"UpdateAttribute",
"Update",
core::Relationship("success", "description"),
true);
plan->setProperty(
update,
"sql.args.1.value",
"4242",
true);
plan->setProperty(
update,
"sql.args.2.value",
"fdsa",
true);
auto log = plan->addProcessor(
"LogAttribute",
"Log",
core::Relationship("success", "description"),
true);
auto put = plan->addProcessor(
"PutSQL",
"PutSQL",
core::Relationship("success", "description"),
true);
plan->setProperty(
put,
"Connection URL",
"sqlite://" + test_db);
plan->runNextProcessor(); // Get
plan->runNextProcessor(); // Update
plan->runNextProcessor(); // Log
plan->runNextProcessor(); // PutSQL
// Verify output state
{
minifi::sqlite::SQLiteConnection db(test_db);
auto stmt = db.prepare("SELECT int_col, text_col FROM test_table;");
stmt.step();
REQUIRE(stmt.is_ok());
REQUIRE(4242 == stmt.column_int64(0));
REQUIRE("fdsa" == stmt.column_text(1));
}
}
TEST_CASE("Test Exec", "[ExecuteSQL]") { // NOLINT
TestController testController;
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
auto plan = testController.createPlan();
auto repo = std::make_shared<TestRepository>();
// Define directory for test db
std::string test_dir("/tmp/gt.XXXXXX");
REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
// Define test db file
std::string test_db(test_dir);
test_db.append("/test.db");
// Create test db
{
minifi::sqlite::SQLiteConnection db(test_db);
auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col TEXT);");
stmt.step();
REQUIRE(stmt.is_ok());
// Insert test data
auto stmt2 = db.prepare("INSERT INTO test_table (int_col, text_col) VALUES (42, 'asdf');");
stmt2.step();
REQUIRE(stmt2.is_ok());
}
// Build MiNiFi processing graph
auto exec = plan->addProcessor(
"ExecuteSQL",
"ExecuteSQL");
plan->setProperty(
exec,
"Connection URL",
"sqlite://" + test_db);
plan->setProperty(
exec,
"SQL Statement",
"SELECT * FROM test_table;");
auto log = plan->addProcessor(
"LogAttribute",
"Log",
core::Relationship("success", "description"),
true);
plan->runNextProcessor(); // Exec
plan->runNextProcessor(); // Log
// Verify output state
REQUIRE(LogTestController::getInstance().contains("key:int_col value:42"));
REQUIRE(LogTestController::getInstance().contains("key:text_col value:asdf"));
}
TEST_CASE("Test Exec 2", "[ExecuteSQL2]") { // NOLINT
TestController testController;
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
auto plan = testController.createPlan();
auto repo = std::make_shared<TestRepository>();
// Define directory for test db
std::string test_dir("/tmp/gt.XXXXXX");
REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
// Define test db file
std::string test_db(test_dir);
test_db.append("/test.db");
// Create test db
{
minifi::sqlite::SQLiteConnection db(test_db);
auto stmt = db.prepare("CREATE TABLE test_table (id_col INTEGER, int_col INTEGER, text_col TEXT);");
stmt.step();
REQUIRE(stmt.is_ok());
// Insert test data
{
auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (1, 33, 'aaaa');");
ins.step();
REQUIRE(ins.is_ok());
}
{
auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (2, 42, 'bbbb');");
ins.step();
REQUIRE(ins.is_ok());
}
{
auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (3, 24, 'cccc');");
ins.step();
REQUIRE(ins.is_ok());
}
}
// Build MiNiFi processing graph
auto generate = plan->addProcessor(
"GenerateFlowFile",
"Generate");
auto update = plan->addProcessor(
"UpdateAttribute",
"Update",
core::Relationship("success", "description"),
true);
plan->setProperty(
update,
"sql.args.1.value",
"2",
true);
auto exec = plan->addProcessor(
"ExecuteSQL",
"ExecuteSQL",
core::Relationship("success", "description"),
true);
plan->setProperty(
exec,
"Connection URL",
"sqlite://" + test_db);
plan->setProperty(
exec,
"SQL Statement",
"SELECT * FROM test_table WHERE id_col = ?;");
std::set<core::Relationship> auto_term_rels;
core::Relationship original("original", "");
auto_term_rels.insert(original);
exec->setAutoTerminatedRelationships(auto_term_rels);
auto log = plan->addProcessor(
"LogAttribute",
"Log",
core::Relationship("success", "description"),
true);
plan->runNextProcessor(); // Gen
plan->runNextProcessor(); // Update
plan->runNextProcessor(); // Exec
plan->runNextProcessor(); // Log
// Verify output state
REQUIRE(LogTestController::getInstance().contains("key:id_col value:2"));
REQUIRE(LogTestController::getInstance().contains("key:int_col value:42"));
REQUIRE(LogTestController::getInstance().contains("key:text_col value:bbbb"));
}
TEST_CASE("Test Exec 3", "[ExecuteSQL3]") { // NOLINT
TestController testController;
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::GetFile>();
LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
auto plan = testController.createPlan();
auto repo = std::make_shared<TestRepository>();
// Define directory for test db
std::string test_dir("/tmp/gt.XXXXXX");
REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
// Define test db file
std::string test_db(test_dir);
test_db.append("/test.db");
// Define directory for test input file
std::string test_in_dir("/tmp/gt.XXXXXX");
REQUIRE(!testController.createTempDirectory(&test_in_dir[0]).empty());
// Define test input file
std::string test_file(test_in_dir);
test_file.append("/test.in");
// Write test SQL content
{
std::ofstream os(test_file);
os << "SELECT text_col FROM test_table WHERE id_col = ?;";
}
// Create test db
{
minifi::sqlite::SQLiteConnection db(test_db);
auto stmt = db.prepare("CREATE TABLE test_table (id_col INTEGER, int_col INTEGER, text_col TEXT);");
stmt.step();
REQUIRE(stmt.is_ok());
// Insert test data
{
auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (1, 33, 'aaaa');");
ins.step();
REQUIRE(ins.is_ok());
}
{
auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (2, 42, 'bbbb');");
ins.step();
REQUIRE(ins.is_ok());
}
{
auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (3, 24, 'cccc');");
ins.step();
REQUIRE(ins.is_ok());
}
}
// Build MiNiFi processing graph
auto get_file = plan->addProcessor(
"GetFile",
"Get");
plan->setProperty(
get_file,
processors::GetFile::Directory.getName(), test_in_dir);
auto update = plan->addProcessor(
"UpdateAttribute",
"Update",
core::Relationship("success", "description"),
true);
plan->setProperty(
update,
"sql.args.1.value",
"2",
true);
auto exec = plan->addProcessor(
"ExecuteSQL",
"ExecuteSQL",
core::Relationship("success", "description"),
true);
plan->setProperty(
exec,
"Connection URL",
"sqlite://" + test_db);
std::set<core::Relationship> auto_term_rels;
core::Relationship original("original", "");
auto_term_rels.insert(original);
exec->setAutoTerminatedRelationships(auto_term_rels);
auto log = plan->addProcessor(
"LogAttribute",
"Log",
core::Relationship("success", "description"),
true);
plan->runNextProcessor(); // Get
plan->runNextProcessor(); // Update
plan->runNextProcessor(); // Exec
plan->runNextProcessor(); // Log
// Verify output state
REQUIRE(LogTestController::getInstance().contains("key:text_col value:bbbb"));
}