blob: 0be7eeaefd491dbf4198a77602dc7d9b4fc066e6 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cxx/Plan.h"
#include "cxx/CallbackProcessor.h"
#include <memory>
#include <vector>
#include <set>
#include <string>
std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> ExecutionPlan::proc_plan_map_ = {};
std::map<std::string, custom_processor_args> ExecutionPlan::custom_processors = {};
ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
: content_repo_(content_repo),
flow_repo_(flow_repo),
prov_repo_(prov_repo),
finalized(false),
location(-1),
current_flowfile_(nullptr),
logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
}
/**
* Add a callback to obtain and pass processor session to a generated processor
*
*/
std::shared_ptr<core::Processor> ExecutionPlan::addSimpleCallback(void *obj, std::function<void(core::ProcessSession*)> fp) {
if (finalized) {
return nullptr;
}
auto simple_func_wrapper = [fp](core::ProcessSession *session, core::ProcessContext *context)->void { fp(session); };
return addCallback(obj, simple_func_wrapper);
}
std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj,
std::function<void(core::ProcessSession*, core::ProcessContext *)> ontrigger_callback,
std::function<void(core::ProcessContext *)> onschedule_callback) {
if (finalized) {
return nullptr;
}
auto proc = createCallback(obj, ontrigger_callback, onschedule_callback);
if (!proc)
return nullptr;
return addProcessor(proc, CallbackProcessorName, core::Relationship("success", "description"), true);
}
bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
uint32_t i = 0;
logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName());
for (i = 0; i < processor_queue_.size(); i++) {
if (processor_queue_.at(i) == proc) {
break;
}
}
if (i >= processor_queue_.size() || i >= processor_contexts_.size()) {
return false;
}
return processor_contexts_.at(i)->setProperty(prop, value);
}
std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
if (finalized) {
return nullptr;
}
utils::Identifier uuid;
id_generator_->generate(uuid);
processor->setStreamFactory(stream_factory);
// initialize the processor
processor->initialize();
processor_mapping_[processor->getUUIDStr()] = processor;
if (!linkToPrevious) {
termination_ = relationship;
} else {
std::shared_ptr<core::Processor> last = processor_queue_.back();
if (last == nullptr) {
last = processor;
termination_ = relationship;
}
relationships_.push_back(connectProcessors(last, processor, relationship, true));
}
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
processor_nodes_.push_back(node);
std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
processor_contexts_.push_back(context);
processor_queue_.push_back(processor);
return processor;
}
std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
if (finalized) {
return nullptr;
}
auto processor = ExecutionPlan::createProcessor(processor_name, name);
if (!processor) {
return nullptr;
}
return addProcessor(processor, name, relationship, linkToPrevious);
}
void ExecutionPlan::reset() {
process_sessions_.clear();
factories_.clear();
location = -1;
for (auto proc : processor_queue_) {
while (proc->getActiveTasks() > 0) {
proc->decrementActiveTask();
}
}
}
bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify,
std::shared_ptr<flowfile_input_params> input_ff_params) {
if (!finalized) {
finalize();
}
location++;
if (location >= processor_queue_.size()) {
return false;
}
std::shared_ptr<core::Processor> processor = processor_queue_[location];
std::shared_ptr<core::ProcessContext> context = processor_contexts_[location];
std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
factories_.push_back(factory);
if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) {
processor->onSchedule(context, factory);
configured_processors_.push_back(processor);
}
std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
process_sessions_.push_back(current_session);
if (input_ff_params) {
std::shared_ptr<minifi::FlowFileRecord> flowFile = std::static_pointer_cast<minifi::FlowFileRecord>(current_session->create());
for(const auto& kv : input_ff_params->attributes) {
flowFile->setAttribute(kv.first, kv.second);
}
current_session->importFrom(*(input_ff_params->content_stream.get()), flowFile);
current_session->transfer(flowFile, core::Relationship("success", "success"));
relationships_[relationships_.size()-1]->put(std::static_pointer_cast<core::FlowFile>(flowFile));
}
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
if (verify != nullptr) {
verify(context, current_session);
} else {
logger_->log_debug("Running %s", processor->getName());
processor->onTrigger(context, current_session);
}
current_session->commit();
current_flowfile_ = current_session->get();
auto hasMore = location + 1 < processor_queue_.size();
if (!hasMore && !current_flowfile_) {
std::set<std::shared_ptr<core::FlowFile>> expired;
current_flowfile_ = relationships_.back()->poll(expired);
}
return hasMore;
}
std::set<std::shared_ptr<provenance::ProvenanceEventRecord>> ExecutionPlan::getProvenanceRecords() {
return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
}
std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
return current_flowfile_;
}
std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
return current_session_;
}
std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) {
return connectProcessors(processor, processor, termination_, set_dst);
}
void ExecutionPlan::finalize() {
if (failure_handler_) {
auto failure_proc = createProcessor(CallbackProcessorName, CallbackProcessorName);
std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1));
for (const auto& proc : processor_queue_) {
for (const auto& rel : proc->getSupportedRelationships()) {
if (rel.getName() == "failure") {
relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true));
break;
}
}
}
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc);
processor_nodes_.push_back(node);
std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
processor_contexts_.push_back(context);
processor_queue_.push_back(failure_proc);
}
if (relationships_.size() > 0) {
relationships_.push_back(buildFinalConnection(processor_queue_.back()));
} else {
for (auto processor : processor_queue_) {
relationships_.push_back(buildFinalConnection(processor, true));
}
}
finalized = true;
}
std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) {
utils::Identifier uuid;
id_generator_->generate(uuid);
auto custom_proc = custom_processors.find(processor_name);
if(custom_proc != custom_processors.end()) {
auto ontrigger_c_func = custom_proc->second.ontr_cb;
auto onschedule_c_func = custom_proc->second.onsc_cb;
auto ontrigger_wrapper_func = [ontrigger_c_func](core::ProcessSession * session, core::ProcessContext * context) {
if(ontrigger_c_func) {
ontrigger_c_func(reinterpret_cast<processor_session *>(session),
reinterpret_cast<processor_context *>(context));
}
};
auto onschedule_wrapper_func = [onschedule_c_func](core::ProcessContext * context) {
if (onschedule_c_func) {
onschedule_c_func(reinterpret_cast<processor_context*>(context));
}
};
return createCallback(nullptr, ontrigger_wrapper_func, onschedule_wrapper_func);
}
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
if (nullptr == ptr) {
return nullptr;
}
std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
processor->setName(name);
return processor;
}
std::shared_ptr<core::Processor> ExecutionPlan::createCallback(void *obj,
std::function<void(core::ProcessSession*, core::ProcessContext *)> ontrigger_callback,
std::function<void(core::ProcessContext *)> onschedule_callback) {
auto ptr = createProcessor(CallbackProcessorName, CallbackProcessorName);
if (!ptr)
return nullptr;
std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr);
processor->setCallback(obj, ontrigger_callback, onschedule_callback);
return ptr;
}
std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc, core::Relationship relationship,
bool set_dst) {
std::stringstream connection_name;
connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr();
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
connection->addRelationship(relationship);
// link the connections so that we can test results at the end for this
connection->setSource(src_proc);
utils::Identifier uuid_copy, uuid_copy_next;
src_proc->getUUID(uuid_copy);
connection->setSourceUUID(uuid_copy);
if (set_dst) {
connection->setDestination(dst_proc);
dst_proc->getUUID(uuid_copy_next);
connection->setDestinationUUID(uuid_copy_next);
if (src_proc != dst_proc) {
dst_proc->addConnection(connection);
}
}
src_proc->addConnection(connection);
return connection;
}
bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) {
if (finalized && !failure_handler_) {
return false; // Already finalized the flow without failure handler processor
}
if (!failure_handler_) {
failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
}
failure_handler_->setCallback(onerror_callback);
return true;
}
bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
if (!failure_handler_) {
return false;
}
failure_handler_->setStrategy(start);
return true;
}
bool ExecutionPlan::addCustomProcessor(custom_processor_args in) {
if(CallbackProcessorName == in.name) {
return false; // This name cannot be registered
}
if (custom_processors.count(in.name) > 0 ) {
return false; // Already exists
}
custom_processors[in.name] = in;
return true;
}
int ExecutionPlan::deleteCustomProcessor(const char * name) {
return custom_processors.erase(name);
}