blob: badea9f6a8c62baea65b0837f877e6327d0d5490 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_
#define LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <unordered_set>
#include <unordered_map>
#include "Core.h"
#include <condition_variable>
#include "core/logging/Logger.h"
#include "Relationship.h"
#include "Scheduling.h"
#include "core/state/FlowIdentifier.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
/**
* Represents the base connectable component
* Purpose: As in NiFi, this represents a connection point and allows the derived
* object to be connected to other connectables.
*/
class Connectable : public CoreComponent {
public:
explicit Connectable(const std::string &name);
explicit Connectable(const std::string &name, const utils::Identifier &uuid);
explicit Connectable(const Connectable &&other);
bool setSupportedRelationships(const std::set<Relationship> &relationships);
// Whether the relationship is supported
bool isSupportedRelationship(const Relationship &relationship);
std::vector<Relationship> getSupportedRelationships() const;
/**
* Sets auto terminated relationships
* @param relationships
* @return result of set operation.
*/
bool setAutoTerminatedRelationships(const std::set<Relationship> &relationships);
// Check whether the relationship is auto terminated
bool isAutoTerminated(const Relationship &relationship);
// Get Processor penalization period in MilliSecond
uint64_t getPenalizationPeriodMsec(void) const {
return (_penalizationPeriodMsec);
}
/**
* Get outgoing connection based on relationship
* @return set of outgoing connections.
*/
virtual std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string &relationship) const;
virtual void put(std::shared_ptr<Connectable> flow) {
}
/**
* Gets and sets next incoming connection
* @return next incoming connection
*/
std::shared_ptr<Connectable> getNextIncomingConnection();
virtual std::shared_ptr<Connectable> pickIncomingConnection();
/**
* @return true if incoming connections > 0
*/
bool hasIncomingConnections() const {
return (_incomingConnections.size() > 0);
}
uint8_t getMaxConcurrentTasks() const {
return max_concurrent_tasks_;
}
void setMaxConcurrentTasks(const uint8_t tasks) {
max_concurrent_tasks_ = tasks;
}
/**
* Yield
*/
virtual void yield() = 0;
virtual ~Connectable();
/**
* Determines if we are connected and operating
*/
virtual bool isRunning() = 0;
/**
* Block until work is available on any input connection, or the given duration elapses
* @param timeoutMs timeout in milliseconds
*/
void waitForWork(uint64_t timeoutMs);
/**
* Notify this processor that work may be available
*/
void notifyWork();
/**
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
virtual bool isWorkAvailable() = 0;
/**
* Verify that this connectable can be stopped.
* @return bool.
*/
virtual bool verifyCanStop() {
if (isRunning())
return true;
return false;
}
/**
* Sets the flow version for this connectable.
*/
void setFlowIdentifier(const std::shared_ptr<state::FlowIdentifier> &version) {
connectable_version_ = version;
}
/**
* Returns theflow version
* @returns flow version. can be null if a flow version is not tracked.
*/
virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const {
return connectable_version_;
}
protected:
// must hold the relationship_mutex_ before calling this
std::shared_ptr<Connectable> getNextIncomingConnectionImpl(const std::lock_guard<std::mutex>& relationship_mutex_lock);
// Penalization Period in MilliSecond
std::atomic<uint64_t> _penalizationPeriodMsec;
uint8_t max_concurrent_tasks_;
// Supported relationships
std::map<std::string, core::Relationship> relationships_;
// Autoterminated relationships
std::map<std::string, core::Relationship> auto_terminated_relationships_;
// Incoming connection Iterator
std::set<std::shared_ptr<Connectable>>::iterator incoming_connections_Iter;
// Incoming connections
std::set<std::shared_ptr<Connectable>> _incomingConnections;
// Outgoing connections map based on Relationship name
std::map<std::string, std::set<std::shared_ptr<Connectable>>> out_going_connections_;
// Mutex for protection
mutable std::mutex relationship_mutex_;
///// work conditionals and locking mechanisms
// Concurrent condition mutex for whether there is incoming work to do
std::mutex work_available_mutex_;
// Condition for whether there is incoming work to do
std::atomic<bool> has_work_;
// Scheduling Strategy
std::atomic<SchedulingStrategy> strategy_;
// Concurrent condition variable for whether there is incoming work to do
std::condition_variable work_condition_;
// version under which this connectable was created.
std::shared_ptr<state::FlowIdentifier> connectable_version_;
private:
std::shared_ptr<logging::Logger> logger_;
};
} // namespace core
/* namespace core */
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
#endif // LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_