blob: a97bc99162a04a88fa408580391b15fb85dadf68 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "./TestBase.h"
TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo,
const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version)
: content_repo_(content_repo),
flow_repo_(flow_repo),
prov_repo_(prov_repo),
finalized(false),
location(-1),
current_flowfile_(nullptr),
flow_version_(flow_version),
logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
}
std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
if (finalized) {
return nullptr;
}
std::lock_guard<std::recursive_mutex> guard(mutex);
uuid_t uuid;
uuid_generate(uuid);
processor->setStreamFactory(stream_factory);
// initialize the processor
processor->initialize();
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
processor_mapping_[processor->getUUIDStr()] = processor;
if (!linkToPrevious) {
termination_ = relationship;
} else {
std::shared_ptr<core::Processor> last = processor_queue_.back();
if (last == nullptr) {
last = processor;
termination_ = relationship;
}
std::stringstream connection_name;
connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
connection->setRelationship(relationship);
// link the connections so that we can test results at the end for this
connection->setSource(last);
connection->setDestination(processor);
uuid_t uuid_copy, uuid_copy_next;
last->getUUID(uuid_copy);
connection->setSourceUUID(uuid_copy);
processor->getUUID(uuid_copy_next);
connection->setDestinationUUID(uuid_copy_next);
last->addConnection(connection);
if (last != processor) {
processor->addConnection(connection);
}
relationships_.push_back(connection);
}
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
processor_nodes_.push_back(node);
std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
processor_contexts_.push_back(context);
processor_queue_.push_back(processor);
return processor;
}
std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
if (finalized) {
return nullptr;
}
std::lock_guard<std::recursive_mutex> guard(mutex);
uuid_t uuid;
uuid_generate(uuid);
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
if (nullptr == ptr) {
throw std::exception();
}
std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
processor->setName(name);
return addProcessor(processor, name, relationship, linkToPrevious);
}
bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
std::lock_guard<std::recursive_mutex> guard(mutex);
int32_t i = 0;
logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName());
for (i = 0; i < processor_queue_.size(); i++) {
if (processor_queue_.at(i) == proc) {
break;
}
}
if (i >= processor_queue_.size() || i < 0 || i >= processor_contexts_.size()) {
return false;
}
if (dynamic) {
return processor_contexts_.at(i)->setDynamicProperty(prop, value);
} else {
return processor_contexts_.at(i)->setProperty(prop, value);
}
}
void TestPlan::reset() {
std::lock_guard<std::recursive_mutex> guard(mutex);
process_sessions_.clear();
factories_.clear();
location = -1;
for (auto proc : processor_queue_) {
while (proc->getActiveTasks() > 0) {
proc->decrementActiveTask();
}
}
}
bool TestPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
if (!finalized) {
finalize();
}
logger_->log_info("Running next processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size());
std::lock_guard<std::recursive_mutex> guard(mutex);
location++;
std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
factories_.push_back(factory);
if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
processor->onSchedule(context, factory);
configured_processors_.push_back(processor);
}
std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
process_sessions_.push_back(current_session);
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
if (verify != nullptr) {
verify(context, current_session);
} else {
logger_->log_info("Running %s", processor->getName());
processor->onTrigger(context, current_session);
}
current_session->commit();
current_flowfile_ = current_session->get();
return location + 1 < processor_queue_.size();
}
std::set<provenance::ProvenanceEventRecord*> TestPlan::getProvenanceRecords() {
return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
}
std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() {
return current_flowfile_;
}
std::shared_ptr<minifi::Connection> TestPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
std::stringstream connection_name;
std::shared_ptr<core::Processor> last = processor;
connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
connection->setRelationship(termination_);
// link the connections so that we can test results at the end for this
connection->setSource(last);
if (setDest)
connection->setDestination(processor);
uuid_t uuid_copy;
last->getUUID(uuid_copy);
connection->setSourceUUID(uuid_copy);
if (setDest)
connection->setDestinationUUID(uuid_copy);
processor->addConnection(connection);
return connection;
}
void TestPlan::finalize() {
std::lock_guard<std::recursive_mutex> guard(mutex);
if (relationships_.size() > 0) {
relationships_.push_back(buildFinalConnection(processor_queue_.back()));
} else {
for (auto processor : processor_queue_) {
relationships_.push_back(buildFinalConnection(processor, true));
}
}
finalized = true;
}