blob: db26ad0dfd92e44cbb17e71a45e7da7264e0f628 [file] [log] [blame]
/**
* @file Processor.h
* Processor class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __PROCESSOR_H__
#define __PROCESSOR_H__
#include <uuid/uuid.h>
#include <vector>
#include <queue>
#include <map>
#include <mutex>
#include <atomic>
#include <algorithm>
#include <set>
#include "TimeUtil.h"
#include "Property.h"
#include "Relationship.h"
#include "Connection.h"
//! Forwarder declaration
class ProcessContext;
class ProcessSession;
//! Minimum scheduling period in Nano Second
#define MINIMUM_SCHEDULING_NANOS 30000
//! Default yield period in second
#define DEFAULT_YIELD_PERIOD_SECONDS 1
//! Default penalization period in second
#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
/*!
* Indicates the valid values for the state of a entity
* with respect to scheduling the entity to run.
*/
enum ScheduledState {
/**
* Entity cannot be scheduled to run
*/
DISABLED,
/**
* Entity can be scheduled to run but currently is not
*/
STOPPED,
/**
* Entity is currently scheduled to run
*/
RUNNING
};
/*!
* Scheduling Strategy
*/
enum SchedulingStrategy {
//! Event driven
EVENT_DRIVEN,
//! Timer driven
TIMER_DRIVEN,
//! Cron Driven
CRON_DRIVEN
};
//! Processor Class
class Processor
{
friend class ProcessContext;
public:
//! Constructor
/*!
* Create a new processor
*/
Processor(std::string name, uuid_t uuid = NULL);
//! Destructor
virtual ~Processor();
//! Set Processor Name
void setName(std::string name) {
_name = name;
}
//! Get Process Name
std::string getName(void) {
return (_name);
}
//! Set UUID
void setUUID(uuid_t uuid) {
uuid_copy(_uuid, uuid);
char uuidStr[37];
uuid_unparse(_uuid, uuidStr);
_uuidStr = uuidStr;
}
//! Get UUID
bool getUUID(uuid_t uuid) {
if (uuid)
{
uuid_copy(uuid, _uuid);
return true;
}
else
return false;
}
//! Set the supported processor properties while the process is not running
bool setSupportedProperties(std::set<Property> properties);
//! Set the supported relationships while the process is not running
bool setSupportedRelationships(std::set<Relationship> relationships);
//! Get the supported property value by name
bool getProperty(std::string name, std::string &value);
//! Set the supported property value by name wile the process is not running
bool setProperty(std::string name, std::string value);
//! Whether the relationship is supported
bool isSupportedRelationship(Relationship relationship);
//! Set the auto terminated relationships while the process is not running
bool setAutoTerminatedRelationships(std::set<Relationship> relationships);
//! Check whether the relationship is auto terminated
bool isAutoTerminated(Relationship relationship);
//! Check whether the processor is running
bool isRunning();
//! Set Processor Scheduled State
void setScheduledState(ScheduledState state) {
_state = state;
}
//! Get Processor Scheduled State
ScheduledState getScheduledState(void) {
return _state;
}
//! Set Processor Scheduling Strategy
void setSchedulingStrategy(SchedulingStrategy strategy) {
_strategy = strategy;
}
//! Get Processor Scheduling Strategy
SchedulingStrategy getSchedulingStrategy(void) {
return _strategy;
}
//! Set Processor Loss Tolerant
void setlossTolerant(bool lossTolerant) {
_lossTolerant = lossTolerant;
}
//! Get Processor Loss Tolerant
bool getlossTolerant(void) {
return _lossTolerant;
}
//! Set Processor Scheduling Period in Nano Second
void setSchedulingPeriodNano(uint64_t period) {
uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS;
_schedulingPeriodNano = std::max(period, minPeriod);
}
//! Get Processor Scheduling Period in Nano Second
uint64_t getSchedulingPeriodNano(void) {
return _schedulingPeriodNano;
}
//! Set Processor Run Duration in Nano Second
void setRunDurationNano(uint64_t period) {
_runDurantionNano = period;
}
//! Get Processor Run Duration in Nano Second
uint64_t getRunDurationNano(void) {
return(_runDurantionNano);
}
//! Set Processor yield period in MilliSecond
void setYieldPeriodMsec(uint64_t period) {
_yieldPeriodMsec = period;
}
//! Get Processor yield period in MilliSecond
uint64_t getYieldPeriodMsec(void) {
return(_yieldPeriodMsec);
}
//! Set Processor penalization period in MilliSecond
void setPenalizationPeriodMsec(uint64_t period) {
_penalizationPeriodMsec = period;
}
//! Get Processor penalization period in MilliSecond
uint64_t getPenalizationPeriodMsec(void) {
return(_penalizationPeriodMsec);
}
//! Set Processor Maximum Concurrent Tasks
void setMaxConcurrentTasks(uint8_t tasks) {
_maxConcurrentTasks = tasks;
}
//! Get Processor Maximum Concurrent Tasks
uint8_t getMaxConcurrentTasks(void) {
return(_maxConcurrentTasks);
}
//! Set Trigger when empty
void setTriggerWhenEmpty(bool value) {
_triggerWhenEmpty = value;
}
//! Get Trigger when empty
bool getTriggerWhenEmpty(void) {
return(_triggerWhenEmpty);
}
//! Get Active Task Counts
uint8_t getActiveTasks(void) {
return(_activeTasks);
}
//! Increment Active Task Counts
void incrementActiveTasks(void) {
_activeTasks++;
}
//! decrement Active Task Counts
void decrementActiveTask(void) {
_activeTasks--;
}
void clearActiveTask(void) {
_activeTasks = 0;
}
//! Yield based on the yield period
void yield()
{
_yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
}
//! Yield based on the input time
void yield(uint64_t time)
{
_yieldExpiration = (getTimeMillis() + time);
}
//! whether need be to yield
bool isYield()
{
if (_yieldExpiration > 0)
return (_yieldExpiration >= getTimeMillis());
else
return false;
}
// clear yield expiration
void clearYield()
{
_yieldExpiration = 0;
}
// get yield time
uint64_t getYieldTime()
{
uint64_t curTime = getTimeMillis();
if (_yieldExpiration > curTime)
return (_yieldExpiration - curTime);
else
return 0;;
}
//! Whether flow file queued in incoming connection
bool flowFilesQueued();
//! Whether flow file queue full in any of the outgoin connection
bool flowFilesOutGoingFull();
//! Get incoming connections
std::set<Connection *> getIncomingConnections() {
return _incomingConnections;
}
//! Has Incoming Connection
bool hasIncomingConnections() {
return (_incomingConnections.size() > 0);
}
//! Get outgoing connections based on relationship name
std::set<Connection *> getOutGoingConnections(std::string relationship);
//! Add connection
bool addConnection(Connection *connection);
//! Remove connection
void removeConnection(Connection *connection);
//! Get the UUID as string
std::string getUUIDStr() {
return _uuidStr;
}
//! Get the Next RoundRobin incoming connection
Connection *getNextIncomingConnection();
//! On Trigger
void onTrigger();
public:
//! OnTrigger method, implemented by NiFi Processor Designer
virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0;
//! Initialize, over write by NiFi Process Designer
virtual void initialize(void) {
return;
}
protected:
//! A global unique identifier
uuid_t _uuid;
//! Processor Name
std::string _name;
//! Supported properties
std::map<std::string, Property> _properties;
//! Supported relationships
std::map<std::string, Relationship> _relationships;
//! Autoterminated relationships
std::map<std::string, Relationship> _autoTerminatedRelationships;
//! Processor state
std::atomic<ScheduledState> _state;
//! Scheduling Strategy
std::atomic<SchedulingStrategy> _strategy;
//! lossTolerant
std::atomic<bool> _lossTolerant;
//! SchedulePeriod in Nano Seconds
std::atomic<uint64_t> _schedulingPeriodNano;
//! Run Duration in Nano Seconds
std::atomic<uint64_t> _runDurantionNano;
//! Yield Period in Milliseconds
std::atomic<uint64_t> _yieldPeriodMsec;
//! Penalization Period in MilliSecond
std::atomic<uint64_t> _penalizationPeriodMsec;
//! Maximum Concurrent Tasks
std::atomic<uint8_t> _maxConcurrentTasks;
//! Active Tasks
std::atomic<uint8_t> _activeTasks;
//! Trigger the Processor even if the incoming connection is empty
std::atomic<bool> _triggerWhenEmpty;
//! Incoming connections
std::set<Connection *> _incomingConnections;
//! Outgoing connections map based on Relationship name
std::map<std::string, std::set<Connection *>> _outGoingConnections;
//! UUID string
std::string _uuidStr;
private:
//! Mutex for protection
std::mutex _mtx;
//! Yield Expiration
std::atomic<uint64_t> _yieldExpiration;
//! Incoming connection Iterator
std::set<Connection *>::iterator _incomingConnectionsIter;
//! Logger
Logger *_logger;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
Processor(const Processor &parent);
Processor &operator=(const Processor &parent);
};
#endif