blob: 1068190e79e7b8bb9e8fe75d444d4ff6888204bd [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "processors/SQLProcessor.h"
#include "FlowFileRecord.h"
#include "ResourceClaim.h"
#ifdef MINIFI_USE_REAL_ODBC_TEST_DRIVER
static const std::string ODBC_SERVICE = "ODBCService";
#else
#include "mocks/MockODBCService.h"
static const std::string ODBC_SERVICE = "MockODBCService";
#endif
class SQLTestPlan {
public:
SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
plan_ = controller.createPlan();
processor_ = plan_->addProcessor(sql_processor, sql_processor, {}, false);
plan_->setProperty(processor_, minifi::processors::SQLProcessor::DBControllerService, "ODBCService");
input_ = plan_->addConnection({}, {"success", "d"}, processor_);
for (const auto& output_rel : output_rels) {
outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
}
// initialize database service
auto service = plan_->addController(ODBC_SERVICE, "ODBCService");
plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString, connection_str);
}
std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
return plan_->getContent(flow_file);
}
std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const std::optional<std::string>& content = {}) {
auto flow_file = std::make_shared<minifi::FlowFileRecordImpl>();
for (const auto& attr : attributes) {
flow_file->setAttribute(attr.first, attr.second);
}
if (content) {
auto claim = std::make_shared<minifi::ResourceClaimImpl>(plan_->getContentRepo());
auto content_stream = plan_->getContentRepo()->write(*claim);
const auto ret = content_stream->write(reinterpret_cast<const uint8_t*>(content->c_str()), content->length());
REQUIRE(ret == content->length());
flow_file->setOffset(0);
flow_file->setSize(content->length());
flow_file->setResourceClaim(claim);
}
input_->put(flow_file);
return flow_file;
}
core::Processor* getSQLProcessor() {
return processor_;
}
void run(bool reschedule = false) {
if (reschedule) {
plan_->reset(reschedule);
}
plan_->runProcessor(static_cast<size_t>(0)); // run the one and only sql processor
}
std::vector<std::shared_ptr<core::FlowFile>> getOutputs(const core::Relationship& relationship) {
auto conn = outputs_[relationship];
REQUIRE(conn);
std::vector<std::shared_ptr<core::FlowFile>> flow_files;
std::set<std::shared_ptr<core::FlowFile>> expired;
while (auto flow_file = conn->poll(expired)) {
REQUIRE(expired.empty());
flow_files.push_back(std::move(flow_file));
}
REQUIRE(expired.empty());
return flow_files;
}
bool setProperty(const std::string_view property, const std::string_view value) const {
return plan_->setProperty(processor_, property, value);
}
private:
std::shared_ptr<TestPlan> plan_;
core::Processor* processor_;
minifi::Connection* input_;
std::map<core::Relationship, minifi::Connection*> outputs_;
};