blob: 46d07948ec88e33f396185438f2383f99bf1d83b [file] [log] [blame]
/**
* @file Processor.cpp
* Processor class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/Processor.h"
#include <time.h>
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <chrono>
#include <string>
#include <thread>
#include <memory>
#include <functional>
#include <utility>
#include "Connection.h"
#include "core/ProcessorConfig.h"
#include "core/Connectable.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessSessionFactory.h"
#include "io/StreamFactory.h"
#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
Processor::Processor(std::string name)
: Connectable(name),
ConfigurableComponent(),
logger_(logging::LoggerFactory<Processor>::getLogger()) {
has_work_.store(false);
// Setup the default values
state_ = DISABLED;
strategy_ = TIMER_DRIVEN;
loss_tolerant_ = false;
_triggerWhenEmpty = false;
scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
run_duration_nano_ = DEFAULT_RUN_DURATION;
yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
_penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
active_tasks_ = 0;
yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
logger_->log_debug("Processor %s created UUID %s", name_, uuidStr_);
}
Processor::Processor(std::string name, utils::Identifier &uuid)
: Connectable(name, uuid),
ConfigurableComponent(),
logger_(logging::LoggerFactory<Processor>::getLogger()) {
has_work_.store(false);
// Setup the default values
state_ = DISABLED;
strategy_ = TIMER_DRIVEN;
loss_tolerant_ = false;
_triggerWhenEmpty = false;
scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
run_duration_nano_ = DEFAULT_RUN_DURATION;
yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
_penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
active_tasks_ = 0;
yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
logger_->log_debug("Processor %s created UUID %s with uuid %s", name_, uuidStr_, uuid.to_string());
}
bool Processor::isRunning() {
return (state_ == RUNNING && active_tasks_ > 0);
}
void Processor::setScheduledState(ScheduledState state) {
state_ = state;
if (state == STOPPED) {
notifyStop();
}
}
bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
bool ret = false;
if (isRunning()) {
logger_->log_warn("Can not add connection while the process %s is running", name_);
return false;
}
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
std::lock_guard<std::mutex> lock(mutex_);
utils::Identifier srcUUID;
utils::Identifier destUUID;
connection->getSourceUUID(srcUUID);
connection->getDestinationUUID(destUUID);
std::string my_uuid = uuid_.to_string();
std::string destination_uuid = destUUID.to_string();
if (my_uuid == destination_uuid) {
// Connection is destination to the current processor
if (_incomingConnections.find(connection) == _incomingConnections.end()) {
_incomingConnections.insert(connection);
connection->setDestination(shared_from_this());
logger_->log_debug("Add connection %s into Processor %s incoming connection", connection->getName(), name_);
incoming_connections_Iter = this->_incomingConnections.begin();
ret = true;
}
}
std::string source_uuid = srcUUID.to_string();
if (my_uuid == source_uuid) {
const auto &rels = connection->getRelationships();
for (auto i = rels.begin(); i != rels.end(); i++) {
const auto relationship = (*i).getName();
// Connection is source from the current processor
auto &&it = out_going_connections_.find(relationship);
if (it != out_going_connections_.end()) {
// We already has connection for this relationship
std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
if (existedConnection.find(connection) == existedConnection.end()) {
// We do not have the same connection for this relationship yet
existedConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = existedConnection;
logger_->log_debug("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
ret = true;
}
} else {
// We do not have any outgoing connection for this relationship yet
std::set<std::shared_ptr<Connectable>> newConnection;
newConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = newConnection;
logger_->log_debug("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
ret = true;
}
}
}
return ret;
}
void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
if (isRunning()) {
logger_->log_warn("Can not remove connection while the process %s is running", name_);
return;
}
std::lock_guard<std::mutex> lock(mutex_);
utils::Identifier srcUUID;
utils::Identifier destUUID;
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
connection->getSourceUUID(srcUUID);
connection->getDestinationUUID(destUUID);
if (uuid_ == destUUID) {
// Connection is destination to the current processor
if (_incomingConnections.find(connection) != _incomingConnections.end()) {
_incomingConnections.erase(connection);
connection->setDestination(NULL);
logger_->log_debug("Remove connection %s into Processor %s incoming connection", connection->getName(), name_);
incoming_connections_Iter = this->_incomingConnections.begin();
}
}
if (uuid_ == srcUUID) {
const auto &rels = connection->getRelationships();
for (auto i = rels.begin(); i != rels.end(); i++) {
const auto relationship = (*i).getName();
// Connection is source from the current processor
auto &&it = out_going_connections_.find(relationship);
if (it != out_going_connections_.end()) {
if (out_going_connections_[relationship].find(connection) != out_going_connections_[relationship].end()) {
out_going_connections_[relationship].erase(connection);
connection->setSource(NULL);
logger_->log_debug("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
}
}
}
}
}
bool Processor::flowFilesQueued() {
std::lock_guard<std::mutex> lock(mutex_);
if (_incomingConnections.size() == 0)
return false;
for (auto &&conn : _incomingConnections) {
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
if (connection->getQueueSize() > 0)
return true;
}
return false;
}
bool Processor::flowFilesOutGoingFull() {
std::lock_guard<std::mutex> lock(mutex_);
for (auto &&connection : out_going_connections_) {
// We already has connection for this relationship
std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
for (const auto conn : existedConnection) {
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
if (connection->isFull())
return true;
}
}
return false;
}
void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
auto session = sessionFactory->createSession();
try {
// Call the virtual trigger function
onTrigger(context, session.get());
session->commit();
} catch (std::exception &exception) {
logger_->log_warn("Caught Exception %s during Processor::onTrigger of processor: %s (%s)", exception.what(), getUUIDStr(), getName());
session->rollback();
throw;
} catch (...) {
logger_->log_warn("Caught Exception during Processor::onTrigger of processor: %s (%s)", getUUIDStr(), getName());
session->rollback();
throw;
}
}
void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const std::shared_ptr<ProcessSessionFactory> &sessionFactory) {
auto session = sessionFactory->createSession();
try {
// Call the virtual trigger function
onTrigger(context, session);
session->commit();
} catch (std::exception &exception) {
logger_->log_warn("Caught Exception %s during Processor::onTrigger of processor: %s (%s)", exception.what(), getUUIDStr(), getName());
session->rollback();
throw;
} catch (...) {
logger_->log_warn("Caught Exception during Processor::onTrigger of processor: %s (%s)", getUUIDStr(), getName());
session->rollback();
throw;
}
}
bool Processor::isWorkAvailable() {
// We have work if any incoming connection has work
std::lock_guard<std::mutex> lock(mutex_);
bool hasWork = false;
try {
for (const auto &conn : _incomingConnections) {
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
if (connection->getQueueSize() > 0) {
hasWork = true;
break;
}
}
} catch (...) {
logger_->log_error("Caught an exception while checking if work is available;"
" unless it was positively determined that work is available, assuming NO work is available!");
}
return hasWork;
}
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */