blob: cc136dcd8376aaac54ea97ca5740da1d6740117b [file] [log] [blame]
/**
* @file Processor.cpp
* Processor class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include "Processor.h"
#include "ProcessContext.h"
#include "ProcessSession.h"
Processor::Processor(std::string name, uuid_t uuid)
: _name(name)
{
if (!uuid)
// Generate the global UUID for the flow record
uuid_generate(_uuid);
else
uuid_copy(_uuid, uuid);
char uuidStr[37];
uuid_unparse(_uuid, uuidStr);
_uuidStr = uuidStr;
// Setup the default values
_state = DISABLED;
_strategy = TIMER_DRIVEN;
_lossTolerant = false;
_triggerWhenEmpty = false;
_schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
_runDurantionNano = 0;
_yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
_penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
_maxConcurrentTasks = 1;
_activeTasks = 0;
_yieldExpiration = 0;
_incomingConnectionsIter = this->_incomingConnections.begin();
_logger = Logger::getLogger();
_logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str());
}
Processor::~Processor()
{
}
bool Processor::isRunning()
{
return (_state == RUNNING && _activeTasks > 0);
}
bool Processor::setSupportedProperties(std::set<Property> properties)
{
if (isRunning())
{
_logger->log_info("Can not set processor property while the process %s is running",
_name.c_str());
return false;
}
std::lock_guard<std::mutex> lock(_mtx);
_properties.clear();
for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it)
{
Property item(*it);
_properties[item.getName()] = item;
_logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str());
}
return true;
}
bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
{
if (isRunning())
{
_logger->log_info("Can not set processor supported relationship while the process %s is running",
_name.c_str());
return false;
}
std::lock_guard<std::mutex> lock(_mtx);
_relationships.clear();
for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
{
Relationship item(*it);
_relationships[item.getName()] = item;
_logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str());
}
return true;
}
bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships)
{
if (isRunning())
{
_logger->log_info("Can not set processor auto terminated relationship while the process %s is running",
_name.c_str());
return false;
}
std::lock_guard<std::mutex> lock(_mtx);
_autoTerminatedRelationships.clear();
for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
{
Relationship item(*it);
_autoTerminatedRelationships[item.getName()] = item;
_logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str());
}
return true;
}
bool Processor::isAutoTerminated(Relationship relationship)
{
bool isRun = isRunning();
if (!isRun)
_mtx.lock();
std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName());
if (it != _autoTerminatedRelationships.end())
{
if (!isRun)
_mtx.unlock();
return true;
}
else
{
if (!isRun)
_mtx.unlock();
return false;
}
}
bool Processor::isSupportedRelationship(Relationship relationship)
{
bool isRun = isRunning();
if (!isRun)
_mtx.lock();
std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName());
if (it != _relationships.end())
{
if (!isRun)
_mtx.unlock();
return true;
}
else
{
if (!isRun)
_mtx.unlock();
return false;
}
}
bool Processor::getProperty(std::string name, std::string &value)
{
bool isRun = isRunning();
if (!isRun)
// Because set property only allowed in non running state, we need to obtain lock avoid rack condition
_mtx.lock();
std::map<std::string, Property>::iterator it = _properties.find(name);
if (it != _properties.end())
{
Property item = it->second;
value = item.getValue();
if (!isRun)
_mtx.unlock();
return true;
}
else
{
if (!isRun)
_mtx.unlock();
return false;
}
}
bool Processor::setProperty(std::string name, std::string value)
{
std::lock_guard<std::mutex> lock(_mtx);
std::map<std::string, Property>::iterator it = _properties.find(name);
if (it != _properties.end())
{
Property item = it->second;
item.setValue(value);
_properties[item.getName()] = item;
_logger->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str());
return true;
}
else
{
return false;
}
}
std::set<Connection *> Processor::getOutGoingConnections(std::string relationship)
{
std::set<Connection *> empty;
std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship);
if (it != _outGoingConnections.end())
{
return _outGoingConnections[relationship];
}
else
{
return empty;
}
}
bool Processor::addConnection(Connection *connection)
{
bool ret = false;
if (isRunning())
{
_logger->log_info("Can not add connection while the process %s is running",
_name.c_str());
return false;
}
std::lock_guard<std::mutex> lock(_mtx);
uuid_t srcUUID;
uuid_t destUUID;
connection->getSourceProcessorUUID(srcUUID);
connection->getDestinationProcessorUUID(destUUID);
if (uuid_compare(_uuid, destUUID) == 0)
{
// Connection is destination to the current processor
if (_incomingConnections.find(connection) == _incomingConnections.end())
{
_incomingConnections.insert(connection);
connection->setDestinationProcessor(this);
_logger->log_info("Add connection %s into Processor %s incoming connection",
connection->getName().c_str(), _name.c_str());
_incomingConnectionsIter = this->_incomingConnections.begin();
ret = true;
}
}
if (uuid_compare(_uuid, srcUUID) == 0)
{
std::string relationship = connection->getRelationship().getName();
// Connection is source from the current processor
std::map<std::string, std::set<Connection *>>::iterator it =
_outGoingConnections.find(relationship);
if (it != _outGoingConnections.end())
{
// We already has connection for this relationship
std::set<Connection *> existedConnection = it->second;
if (existedConnection.find(connection) == existedConnection.end())
{
// We do not have the same connection for this relationship yet
existedConnection.insert(connection);
connection->setSourceProcessor(this);
_outGoingConnections[relationship] = existedConnection;
_logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
connection->getName().c_str(), _name.c_str(), relationship.c_str());
ret = true;
}
}
else
{
// We do not have any outgoing connection for this relationship yet
std::set<Connection *> newConnection;
newConnection.insert(connection);
connection->setSourceProcessor(this);
_outGoingConnections[relationship] = newConnection;
_logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
connection->getName().c_str(), _name.c_str(), relationship.c_str());
ret = true;
}
}
return ret;
}
void Processor::removeConnection(Connection *connection)
{
if (isRunning())
{
_logger->log_info("Can not remove connection while the process %s is running",
_name.c_str());
return;
}
std::lock_guard<std::mutex> lock(_mtx);
uuid_t srcUUID;
uuid_t destUUID;
connection->getSourceProcessorUUID(srcUUID);
connection->getDestinationProcessorUUID(destUUID);
if (uuid_compare(_uuid, destUUID) == 0)
{
// Connection is destination to the current processor
if (_incomingConnections.find(connection) != _incomingConnections.end())
{
_incomingConnections.erase(connection);
connection->setDestinationProcessor(NULL);
_logger->log_info("Remove connection %s into Processor %s incoming connection",
connection->getName().c_str(), _name.c_str());
_incomingConnectionsIter = this->_incomingConnections.begin();
}
}
if (uuid_compare(_uuid, srcUUID) == 0)
{
std::string relationship = connection->getRelationship().getName();
// Connection is source from the current processor
std::map<std::string, std::set<Connection *>>::iterator it =
_outGoingConnections.find(relationship);
if (it == _outGoingConnections.end())
{
return;
}
else
{
if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end())
{
_outGoingConnections[relationship].erase(connection);
connection->setSourceProcessor(NULL);
_logger->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s",
connection->getName().c_str(), _name.c_str(), relationship.c_str());
}
}
}
}
Connection *Processor::getNextIncomingConnection()
{
std::lock_guard<std::mutex> lock(_mtx);
if (_incomingConnections.size() == 0)
return NULL;
if (_incomingConnectionsIter == _incomingConnections.end())
_incomingConnectionsIter = _incomingConnections.begin();
Connection *ret = *_incomingConnectionsIter;
_incomingConnectionsIter++;
if (_incomingConnectionsIter == _incomingConnections.end())
_incomingConnectionsIter = _incomingConnections.begin();
return ret;
}
bool Processor::flowFilesQueued()
{
std::lock_guard<std::mutex> lock(_mtx);
if (_incomingConnections.size() == 0)
return false;
for (std::set<Connection *>::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it)
{
Connection *connection = *it;
if (connection->getQueueSize() > 0)
return true;
}
return false;
}
bool Processor::flowFilesOutGoingFull()
{
std::lock_guard<std::mutex> lock(_mtx);
std::map<std::string, std::set<Connection *>>::iterator it;
for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it)
{
// We already has connection for this relationship
std::set<Connection *> existedConnection = it->second;
for (std::set<Connection *>::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection)
{
Connection *connection = *itConnection;
if (connection->isFull())
return true;
}
}
return false;
}
void Processor::onTrigger()
{
ProcessContext *context = new ProcessContext(this);
ProcessSession *session = new ProcessSession(context);
try {
// Call the child onTrigger function
this->onTrigger(context, session);
session->commit();
delete session;
delete context;
}
catch (std::exception &exception)
{
_logger->log_debug("Caught Exception %s", exception.what());
session->rollback();
delete session;
delete context;
throw;
}
catch (...)
{
_logger->log_debug("Caught Exception Processor::onTrigger");
session->rollback();
delete session;
delete context;
throw;
}
}